diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/CreateObservabilityObjectAction.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/CreateObservabilityObjectAction.kt new file mode 100644 index 000000000..acb1c15a6 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/CreateObservabilityObjectAction.kt @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.observability.action + +import org.opensearch.action.ActionType +import org.opensearch.action.support.ActionFilters +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.commons.authuser.User +import org.opensearch.observability.model.CreateObservabilityObjectRequest +import org.opensearch.observability.model.CreateObservabilityObjectResponse +import org.opensearch.transport.TransportService + +/** + * Create ObservabilityObject transport action + */ +internal class CreateObservabilityObjectAction @Inject constructor( + transportService: TransportService, + client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry +) : PluginBaseAction( + NAME, + transportService, + client, + actionFilters, + ::CreateObservabilityObjectRequest +) { + companion object { + private const val NAME = "cluster:admin/opensearch/observability/create" + internal val ACTION_TYPE = ActionType(NAME, ::CreateObservabilityObjectResponse) + } + + /** + * {@inheritDoc} + */ + override fun executeRequest( + request: CreateObservabilityObjectRequest, + user: User? + ): CreateObservabilityObjectResponse { + return ObservabilityActions.create(request, user) + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/DeleteObservabilityObjectAction.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/DeleteObservabilityObjectAction.kt new file mode 100644 index 000000000..2294ec567 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/DeleteObservabilityObjectAction.kt @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.opensearch.observability.action + +import org.opensearch.action.ActionType +import org.opensearch.action.support.ActionFilters +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.commons.authuser.User +import org.opensearch.observability.model.DeleteObservabilityObjectRequest +import org.opensearch.observability.model.DeleteObservabilityObjectResponse +import org.opensearch.transport.TransportService + +/** + * Delete ObservabilityObject transport action + */ +internal class DeleteObservabilityObjectAction @Inject constructor( + transportService: TransportService, + client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry +) : PluginBaseAction( + NAME, + transportService, + client, + actionFilters, + ::DeleteObservabilityObjectRequest +) { + companion object { + private const val NAME = "cluster:admin/opensearch/observability/delete" + internal val ACTION_TYPE = ActionType(NAME, ::DeleteObservabilityObjectResponse) + } + + /** + * {@inheritDoc} + */ + override fun executeRequest(request: DeleteObservabilityObjectRequest, user: User?): DeleteObservabilityObjectResponse { + return ObservabilityActions.delete(request, user) + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/GetObservabilityObjectAction.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/GetObservabilityObjectAction.kt new file mode 100644 index 000000000..f549e8bf7 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/GetObservabilityObjectAction.kt @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.opensearch.observability.action + +import org.opensearch.action.ActionType +import org.opensearch.action.support.ActionFilters +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.commons.authuser.User +import org.opensearch.observability.model.GetObservabilityObjectRequest +import org.opensearch.observability.model.GetObservabilityObjectResponse +import org.opensearch.transport.TransportService + +/** + * Get ObservabilityObject transport action + */ +internal class GetObservabilityObjectAction @Inject constructor( + transportService: TransportService, + client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry +) : PluginBaseAction( + NAME, + transportService, + client, + actionFilters, + ::GetObservabilityObjectRequest +) { + companion object { + private const val NAME = "cluster:admin/opensearch/observability/get" + internal val ACTION_TYPE = ActionType(NAME, ::GetObservabilityObjectResponse) + } + + /** + * {@inheritDoc} + */ + override fun executeRequest(request: GetObservabilityObjectRequest, user: User?): GetObservabilityObjectResponse { + return ObservabilityActions.get(request, user) + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/ObservabilityActions.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/ObservabilityActions.kt new file mode 100644 index 000000000..c8bda7699 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/ObservabilityActions.kt @@ -0,0 +1,304 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.opensearch.observability.action + +import org.opensearch.OpenSearchStatusException +import org.opensearch.commons.authuser.User +import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX +import org.opensearch.observability.index.ObservabilityIndex +import org.opensearch.observability.model.CreateObservabilityObjectRequest +import org.opensearch.observability.model.CreateObservabilityObjectResponse +import org.opensearch.observability.model.DeleteObservabilityObjectRequest +import org.opensearch.observability.model.DeleteObservabilityObjectResponse +import org.opensearch.observability.model.GetObservabilityObjectRequest +import org.opensearch.observability.model.GetObservabilityObjectResponse +import org.opensearch.observability.model.ObservabilityObjectDoc +import org.opensearch.observability.model.ObservabilityObjectSearchResult +import org.opensearch.observability.model.UpdateObservabilityObjectRequest +import org.opensearch.observability.model.UpdateObservabilityObjectResponse +import org.opensearch.observability.security.UserAccessManager +import org.opensearch.observability.util.logger +import org.opensearch.rest.RestStatus +import java.time.Instant + +/** + * ObservabilityObject index operation actions. + */ +internal object ObservabilityActions { + private val log by logger(ObservabilityActions::class.java) + + /** + * Create new ObservabilityObject + * @param request [CreateObservabilityObjectRequest] object + * @return [CreateObservabilityObjectResponse] + */ + fun create(request: CreateObservabilityObjectRequest, user: User?): CreateObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-create") + UserAccessManager.validateUser(user) + val currentTime = Instant.now() + val objectDoc = ObservabilityObjectDoc( + "ignore", + currentTime, + currentTime, + UserAccessManager.getUserTenant(user), + UserAccessManager.getAllAccessInfo(user), + request.type, + request.objectData + ) + val docId = ObservabilityIndex.createObservabilityObject(objectDoc, request.objectId) + docId ?: throw OpenSearchStatusException( + "ObservabilityObject Creation failed", + RestStatus.INTERNAL_SERVER_ERROR + ) + return CreateObservabilityObjectResponse(docId) + } + + /** + * Update ObservabilityObject + * @param request [UpdateObservabilityObjectRequest] object + * @return [UpdateObservabilityObjectResponse] + */ + fun update(request: UpdateObservabilityObjectRequest, user: User?): UpdateObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-update ${request.objectId}") + UserAccessManager.validateUser(user) + val observabilityObject = ObservabilityIndex.getObservabilityObject(request.objectId) + observabilityObject + ?: throw OpenSearchStatusException( + "ObservabilityObject ${request.objectId} not found", + RestStatus.NOT_FOUND + ) + val currentDoc = observabilityObject.observabilityObjectDoc + if (!UserAccessManager.doesUserHasAccess(user, currentDoc.tenant, currentDoc.access)) { + throw OpenSearchStatusException( + "Permission denied for ObservabilityObject ${request.objectId}", + RestStatus.FORBIDDEN + ) + } + val currentTime = Instant.now() + val objectDoc = ObservabilityObjectDoc( + request.objectId, + currentTime, + currentDoc.createdTime, + UserAccessManager.getUserTenant(user), + UserAccessManager.getAllAccessInfo(user), + request.type, + request.objectData + ) + if (!ObservabilityIndex.updateObservabilityObject(request.objectId, objectDoc)) { + throw OpenSearchStatusException("ObservabilityObject Update failed", RestStatus.INTERNAL_SERVER_ERROR) + } + return UpdateObservabilityObjectResponse(request.objectId) + } + + /** + * Get ObservabilityObject info + * @param request [GetObservabilityObjectRequest] object + * @return [GetObservabilityObjectResponse] + */ + fun get(request: GetObservabilityObjectRequest, user: User?): GetObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-get ${request.objectIds}") + UserAccessManager.validateUser(user) + return when (request.objectIds.size) { + 0 -> getAll(request, user) + 1 -> info(request.objectIds.first(), user) + else -> info(request.objectIds, user) + } + } + + /** + * Get ObservabilityObject info + * @param objectId object id + * @param user the user info object + * @return [GetObservabilityObjectResponse] + */ + private fun info(objectId: String, user: User?): GetObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-info $objectId") + val observabilityObjectDocInfo = ObservabilityIndex.getObservabilityObject(objectId) + observabilityObjectDocInfo + ?: run { + throw OpenSearchStatusException("ObservabilityObject $objectId not found", RestStatus.NOT_FOUND) + } + val currentDoc = observabilityObjectDocInfo.observabilityObjectDoc + if (!UserAccessManager.doesUserHasAccess(user, currentDoc.tenant, currentDoc.access)) { + throw OpenSearchStatusException("Permission denied for ObservabilityObject $objectId", RestStatus.FORBIDDEN) + } + val docInfo = ObservabilityObjectDoc( + objectId, + currentDoc.updatedTime, + currentDoc.createdTime, + currentDoc.tenant, + currentDoc.access, + currentDoc.type, + currentDoc.objectData + ) + return GetObservabilityObjectResponse( + ObservabilityObjectSearchResult(docInfo), + UserAccessManager.hasAllInfoAccess(user) + ) + } + + /** + * Get ObservabilityObject info + * @param objectIds object id set + * @param user the user info object + * @return [GetObservabilityObjectResponse] + */ + private fun info(objectIds: Set, user: User?): GetObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-info $objectIds") + val objectDocs = ObservabilityIndex.getObservabilityObjects(objectIds) + if (objectDocs.size != objectIds.size) { + val mutableSet = objectIds.toMutableSet() + objectDocs.forEach { mutableSet.remove(it.id) } + throw OpenSearchStatusException( + "ObservabilityObject $mutableSet not found", + RestStatus.NOT_FOUND + ) + } + objectDocs.forEach { + val currentDoc = it.observabilityObjectDoc + if (!UserAccessManager.doesUserHasAccess(user, currentDoc.tenant, currentDoc.access)) { + throw OpenSearchStatusException( + "Permission denied for ObservabilityObject ${it.id}", + RestStatus.FORBIDDEN + ) + } + } + val configSearchResult = objectDocs.map { + ObservabilityObjectDoc( + it.id!!, + it.observabilityObjectDoc.updatedTime, + it.observabilityObjectDoc.createdTime, + it.observabilityObjectDoc.tenant, + it.observabilityObjectDoc.access, + it.observabilityObjectDoc.type, + it.observabilityObjectDoc.objectData + ) + } + return GetObservabilityObjectResponse( + ObservabilityObjectSearchResult(configSearchResult), + UserAccessManager.hasAllInfoAccess(user) + ) + } + + /** + * Get all ObservabilityObject matching the criteria + * @param request [GetObservabilityObjectRequest] object + * @param user the user info object + * @return [GetObservabilityObjectResponse] + */ + private fun getAll(request: GetObservabilityObjectRequest, user: User?): GetObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-getAll") + val searchResult = ObservabilityIndex.getAllObservabilityObjects( + UserAccessManager.getUserTenant(user), + UserAccessManager.getSearchAccessInfo(user), + request + ) + return GetObservabilityObjectResponse(searchResult, UserAccessManager.hasAllInfoAccess(user)) + } + + /** + * Delete ObservabilityObject + * @param request [DeleteObservabilityObjectRequest] object + * @param user the user info object + * @return [DeleteObservabilityObjectResponse] + */ + fun delete(request: DeleteObservabilityObjectRequest, user: User?): DeleteObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-delete ${request.objectIds}") + return if (request.objectIds.size == 1) { + delete(request.objectIds.first(), user) + } else { + delete(request.objectIds, user) + } + } + + /** + * Delete by object id + * + * @param objectId + * @param user + * @return [DeleteObservabilityObjectResponse] + */ + private fun delete(objectId: String, user: User?): DeleteObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-delete $objectId") + UserAccessManager.validateUser(user) + val observabilityObjectDocInfo = ObservabilityIndex.getObservabilityObject(objectId) + observabilityObjectDocInfo + ?: run { + throw OpenSearchStatusException( + "ObservabilityObject $objectId not found", + RestStatus.NOT_FOUND + ) + } + + val currentDoc = observabilityObjectDocInfo.observabilityObjectDoc + if (!UserAccessManager.doesUserHasAccess(user, currentDoc.tenant, currentDoc.access)) { + throw OpenSearchStatusException( + "Permission denied for ObservabilityObject $objectId", + RestStatus.FORBIDDEN + ) + } + if (!ObservabilityIndex.deleteObservabilityObject(objectId)) { + throw OpenSearchStatusException( + "ObservabilityObject $objectId delete failed", + RestStatus.REQUEST_TIMEOUT + ) + } + return DeleteObservabilityObjectResponse(mapOf(Pair(objectId, RestStatus.OK))) + } + + /** + * Delete ObservabilityObject + * @param objectIds ObservabilityObject object ids + * @param user the user info object + * @return [DeleteObservabilityObjectResponse] + */ + private fun delete(objectIds: Set, user: User?): DeleteObservabilityObjectResponse { + log.info("$LOG_PREFIX:ObservabilityObject-delete $objectIds") + UserAccessManager.validateUser(user) + val configDocs = ObservabilityIndex.getObservabilityObjects(objectIds) + if (configDocs.size != objectIds.size) { + val mutableSet = objectIds.toMutableSet() + configDocs.forEach { mutableSet.remove(it.id) } + throw OpenSearchStatusException( + "ObservabilityObject $mutableSet not found", + RestStatus.NOT_FOUND + ) + } + configDocs.forEach { + val currentDoc = it.observabilityObjectDoc + if (!UserAccessManager.doesUserHasAccess(user, currentDoc.tenant, currentDoc.access)) { + throw OpenSearchStatusException( + "Permission denied for ObservabilityObject ${it.id}", + RestStatus.FORBIDDEN + ) + } + } + val deleteStatus = ObservabilityIndex.deleteObservabilityObjects(objectIds) + return DeleteObservabilityObjectResponse(deleteStatus) + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/PluginBaseAction.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/PluginBaseAction.kt new file mode 100644 index 000000000..25e302260 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/PluginBaseAction.kt @@ -0,0 +1,122 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.opensearch.observability.action + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.client.Client +import org.opensearch.common.io.stream.Writeable +import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT +import org.opensearch.commons.authuser.User +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.indices.InvalidIndexNameException +import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX +import org.opensearch.observability.util.logger +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.io.IOException + +abstract class PluginBaseAction( + name: String, + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + requestReader: Writeable.Reader +) : HandledTransportAction(name, transportService, actionFilters, requestReader) { + companion object { + private val log by logger(PluginBaseAction::class.java) + private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + } + + /** + * {@inheritDoc} + */ + @Suppress("TooGenericExceptionCaught") + override fun doExecute( + task: Task?, + request: Request, + listener: ActionListener + ) { + val userStr: String? = client.threadPool().threadContext.getTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + val user: User? = User.parse(userStr) + scope.launch { + try { + listener.onResponse(executeRequest(request, user)) + } catch (exception: OpenSearchStatusException) { + log.warn("$LOG_PREFIX:OpenSearchStatusException: message:${exception.message}") + listener.onFailure(exception) + } catch (exception: OpenSearchSecurityException) { + log.warn("$LOG_PREFIX:OpenSearchSecurityException:", exception) + listener.onFailure( + OpenSearchStatusException( + "Permissions denied: ${exception.message} - Contact administrator", + RestStatus.FORBIDDEN + ) + ) + } catch (exception: VersionConflictEngineException) { + log.warn("$LOG_PREFIX:VersionConflictEngineException:", exception) + listener.onFailure(OpenSearchStatusException(exception.message, RestStatus.CONFLICT)) + } catch (exception: IndexNotFoundException) { + log.warn("$LOG_PREFIX:IndexNotFoundException:", exception) + listener.onFailure(OpenSearchStatusException(exception.message, RestStatus.NOT_FOUND)) + } catch (exception: InvalidIndexNameException) { + log.warn("$LOG_PREFIX:InvalidIndexNameException:", exception) + listener.onFailure(OpenSearchStatusException(exception.message, RestStatus.BAD_REQUEST)) + } catch (exception: IllegalArgumentException) { + log.warn("$LOG_PREFIX:IllegalArgumentException:", exception) + listener.onFailure(OpenSearchStatusException(exception.message, RestStatus.BAD_REQUEST)) + } catch (exception: IllegalStateException) { + log.warn("$LOG_PREFIX:IllegalStateException:", exception) + listener.onFailure(OpenSearchStatusException(exception.message, RestStatus.SERVICE_UNAVAILABLE)) + } catch (exception: IOException) { + log.error("$LOG_PREFIX:Uncaught IOException:", exception) + listener.onFailure(OpenSearchStatusException(exception.message, RestStatus.FAILED_DEPENDENCY)) + } catch (exception: Exception) { + log.error("$LOG_PREFIX:Uncaught Exception:", exception) + listener.onFailure(OpenSearchStatusException(exception.message, RestStatus.INTERNAL_SERVER_ERROR)) + } + } + } + + /** + * Execute the transport request + * @param request the request to execute + * @return the response to return. + */ + abstract fun executeRequest(request: Request, user: User?): Response +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/UpdateObservabilityObjectAction.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/UpdateObservabilityObjectAction.kt new file mode 100644 index 000000000..315758fba --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/action/UpdateObservabilityObjectAction.kt @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.opensearch.observability.action + +import org.opensearch.action.ActionType +import org.opensearch.action.support.ActionFilters +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.commons.authuser.User +import org.opensearch.observability.model.UpdateObservabilityObjectRequest +import org.opensearch.observability.model.UpdateObservabilityObjectResponse +import org.opensearch.transport.TransportService + +/** + * Update ObservabilityObject transport action + */ +internal class UpdateObservabilityObjectAction @Inject constructor( + transportService: TransportService, + client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry +) : PluginBaseAction( + NAME, + transportService, + client, + actionFilters, + ::UpdateObservabilityObjectRequest +) { + companion object { + private const val NAME = "cluster:admin/opensearch/observability/update" + internal val ACTION_TYPE = ActionType(NAME, ::UpdateObservabilityObjectResponse) + } + + /** + * {@inheritDoc} + */ + override fun executeRequest(request: UpdateObservabilityObjectRequest, user: User?): UpdateObservabilityObjectResponse { + return ObservabilityActions.update(request, user) + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityIndex.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityIndex.kt new file mode 100644 index 000000000..45e487acf --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityIndex.kt @@ -0,0 +1,364 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package org.opensearch.observability.index + +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.ResourceNotFoundException +import org.opensearch.action.DocWriteResponse +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.get.MultiGetRequest +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.update.UpdateRequest +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentType +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.ReindexAction +import org.opensearch.index.reindex.ReindexRequestBuilder +import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX +import org.opensearch.observability.model.GetObservabilityObjectRequest +import org.opensearch.observability.model.ObservabilityObjectDoc +import org.opensearch.observability.model.ObservabilityObjectDocInfo +import org.opensearch.observability.model.ObservabilityObjectSearchResult +import org.opensearch.observability.model.RestTag.ACCESS_LIST_FIELD +import org.opensearch.observability.model.RestTag.TENANT_FIELD +import org.opensearch.observability.model.SearchResults +import org.opensearch.observability.settings.PluginSettings +import org.opensearch.observability.util.SecureIndexClient +import org.opensearch.observability.util.logger +import org.opensearch.rest.RestStatus +import org.opensearch.search.SearchHit +import org.opensearch.search.builder.SearchSourceBuilder +import java.util.concurrent.TimeUnit + +/** + * Class for doing OpenSearch index operation to maintain observability objects in cluster. + */ +@Suppress("TooManyFunctions") +internal object ObservabilityIndex { + private val log by logger(ObservabilityIndex::class.java) + private const val INDEX_NAME = ".opensearch-observability" + private const val NOTEBOOKS_INDEX_NAME = ".opensearch-notebooks" + private const val OBSERVABILITY_MAPPING_FILE_NAME = "observability-mapping.yml" + private const val OBSERVABILITY_SETTINGS_FILE_NAME = "observability-settings.yml" + private const val MAPPING_TYPE = "_doc" + + private lateinit var client: Client + private lateinit var clusterService: ClusterService + + private val searchHitParser = object : SearchResults.SearchHitParser { + override fun parse(searchHit: SearchHit): ObservabilityObjectDoc { + val parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + searchHit.sourceAsString + ) + parser.nextToken() + return ObservabilityObjectDoc.parse(parser, searchHit.id) + } + } + + /** + * Initialize the class + * @param client The OpenSearch client + * @param clusterService The OpenSearch cluster service + */ + fun initialize(client: Client, clusterService: ClusterService) { + this.client = SecureIndexClient(client) + this.clusterService = clusterService + } + + /** + * Create index using the mapping and settings defined in resource + * If .opensearch-notebooks index exists, reindex it to .opensearch-observability index and remove it + */ + @Suppress("TooGenericExceptionCaught") + private fun createIndex() { + if (!isIndexExists(INDEX_NAME)) { + val classLoader = ObservabilityIndex::class.java.classLoader + val indexMappingSource = classLoader.getResource(OBSERVABILITY_MAPPING_FILE_NAME)?.readText()!! + val indexSettingsSource = classLoader.getResource(OBSERVABILITY_SETTINGS_FILE_NAME)?.readText()!! + val request = CreateIndexRequest(INDEX_NAME) + .mapping(MAPPING_TYPE, indexMappingSource, XContentType.YAML) + .settings(indexSettingsSource, XContentType.YAML) + try { + val actionFuture = client.admin().indices().create(request) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + if (response.isAcknowledged) { + log.info("$LOG_PREFIX:Index $INDEX_NAME creation Acknowledged") + } else { + throw IllegalStateException("$LOG_PREFIX:Index $INDEX_NAME creation not Acknowledged") + } + } catch (exception: Exception) { + if (exception !is ResourceAlreadyExistsException && exception.cause !is ResourceAlreadyExistsException) { + throw exception + } + } + } + if (isIndexExists(NOTEBOOKS_INDEX_NAME)) { + try { + log.info("$LOG_PREFIX:Index - reindex $NOTEBOOKS_INDEX_NAME to $INDEX_NAME") + val reindexResponse = ReindexRequestBuilder(client, ReindexAction.INSTANCE) + .source(NOTEBOOKS_INDEX_NAME) + .destination(INDEX_NAME) + .abortOnVersionConflict(false) + .refresh(true) + .get() + if (reindexResponse.isTimedOut) { + throw IllegalStateException("$LOG_PREFIX:Index - reindex $NOTEBOOKS_INDEX_NAME timed out") + } else if (reindexResponse.searchFailures.isNotEmpty()) { + throw IllegalStateException("$LOG_PREFIX:Index - reindex $NOTEBOOKS_INDEX_NAME failed with searchFailures") + } else if (reindexResponse.bulkFailures.isNotEmpty()) { + throw IllegalStateException("$LOG_PREFIX:Index - reindex $NOTEBOOKS_INDEX_NAME failed with bulkFailures") + } + + log.info("$LOG_PREFIX:Index - ${reindexResponse.total} docs reindexed to $INDEX_NAME") + val deleteIndexRequest = DeleteIndexRequest(NOTEBOOKS_INDEX_NAME) + val actionFuture = client.admin().indices().delete(deleteIndexRequest) + val deleteIndexResponse = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + if (deleteIndexResponse.isAcknowledged) { + log.info("$LOG_PREFIX:Index $INDEX_NAME deletion Acknowledged") + } else { + throw IllegalStateException("$LOG_PREFIX:Index $NOTEBOOKS_INDEX_NAME deletion not Acknowledged") + } + } catch (exception: Exception) { + if (exception !is ResourceNotFoundException && exception.cause !is ResourceNotFoundException) { + throw exception + } + } + } + } + + /** + * Check if the index is created and available. + * @param index + * @return true if index is available, false otherwise + */ + private fun isIndexExists(index: String): Boolean { + val clusterState = clusterService.state() + return clusterState.routingTable.hasIndex(index) + } + + /** + * Create observability object + * + * @param observabilityObjectDoc + * @param id + * @return object id if successful, otherwise null + */ + fun createObservabilityObject(observabilityObjectDoc: ObservabilityObjectDoc, id: String? = null): String? { + createIndex() + val xContent = observabilityObjectDoc.toXContent() + val indexRequest = IndexRequest(INDEX_NAME) + .source(xContent) + .create(true) + if (id != null) { + indexRequest.id(id) + } + val actionFuture = client.index(indexRequest) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + return if (response.result != DocWriteResponse.Result.CREATED) { + log.warn("$LOG_PREFIX:createObservabilityObject - response:$response") + null + } else { + response.id + } + } + + /** + * Get observability object + * + * @param id + * @return [ObservabilityObjectDocInfo] + */ + fun getObservabilityObject(id: String): ObservabilityObjectDocInfo? { + createIndex() + val getRequest = GetRequest(INDEX_NAME).id(id) + val actionFuture = client.get(getRequest) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + return parseObservabilityObjectDoc(id, response) + } + + /** + * Get multiple observability objects + * + * @param ids + * @return list of [ObservabilityObjectDocInfo] + */ + fun getObservabilityObjects(ids: Set): List { + createIndex() + val getRequest = MultiGetRequest() + ids.forEach { getRequest.add(INDEX_NAME, it) } + val actionFuture = client.multiGet(getRequest) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + return response.responses.mapNotNull { parseObservabilityObjectDoc(it.id, it.response) } + } + + /** + * Parse observability object doc + * + * @param id + * @param response + * @return [ObservabilityObjectDocInfo] + */ + private fun parseObservabilityObjectDoc(id: String, response: GetResponse): ObservabilityObjectDocInfo? { + return if (response.sourceAsString == null) { + log.warn("$LOG_PREFIX:getObservabilityObject - $id not found; response:$response") + null + } else { + val parser = XContentType.JSON.xContent().createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.sourceAsString + ) + parser.nextToken() + val doc = ObservabilityObjectDoc.parse(parser, id) + ObservabilityObjectDocInfo(id, response.version, response.seqNo, response.primaryTerm, doc) + } + } + + /** + * Get all observability objects + * + * @param tenant + * @param access + * @param request + * @return [ObservabilityObjectSearchResult] + */ + fun getAllObservabilityObjects( + tenant: String, + access: List, + request: GetObservabilityObjectRequest + ): ObservabilityObjectSearchResult { + createIndex() + val queryHelper = ObservabilityQueryHelper(request.types) + val sourceBuilder = SearchSourceBuilder() + .timeout(TimeValue(PluginSettings.operationTimeoutMs, TimeUnit.MILLISECONDS)) + .size(request.maxItems) + .from(request.fromIndex) + queryHelper.addSortField(sourceBuilder, request.sortField, request.sortOrder) + + val query = QueryBuilders.boolQuery() + query.filter(QueryBuilders.termsQuery(TENANT_FIELD, tenant)) + if (access.isNotEmpty()) { + query.filter(QueryBuilders.termsQuery(ACCESS_LIST_FIELD, access)) + } + queryHelper.addTypeFilters(query) + queryHelper.addQueryFilters(query, request.filterParams) + sourceBuilder.query(query) + val searchRequest = SearchRequest() + .indices(INDEX_NAME) + .source(sourceBuilder) + val actionFuture = client.search(searchRequest) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + val result = ObservabilityObjectSearchResult(request.fromIndex.toLong(), response, searchHitParser) + log.info( + "$LOG_PREFIX:getAllObservabilityObjects types:${request.types} from:${request.fromIndex}, maxItems:${request.maxItems}," + + " sortField:${request.sortField}, sortOrder=${request.sortOrder}, filters=${request.filterParams}" + + " retCount:${result.objectList.size}, totalCount:${result.totalHits}" + ) + return result + } + + /** + * Update observability object + * + * @param id + * @param observabilityObjectDoc + * @return true if successful, otherwise false + */ + fun updateObservabilityObject(id: String, observabilityObjectDoc: ObservabilityObjectDoc): Boolean { + createIndex() + val updateRequest = UpdateRequest() + .index(INDEX_NAME) + .id(id) + .doc(observabilityObjectDoc.toXContent()) + .fetchSource(true) + val actionFuture = client.update(updateRequest) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + if (response.result != DocWriteResponse.Result.UPDATED) { + log.warn("$LOG_PREFIX:updateObservabilityObject failed for $id; response:$response") + } + return response.result == DocWriteResponse.Result.UPDATED + } + + /** + * Delete observability object + * + * @param id + * @return true if successful, otherwise false + */ + fun deleteObservabilityObject(id: String): Boolean { + createIndex() + val deleteRequest = DeleteRequest() + .index(INDEX_NAME) + .id(id) + val actionFuture = client.delete(deleteRequest) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + if (response.result != DocWriteResponse.Result.DELETED) { + log.warn("$LOG_PREFIX:deleteObservabilityObject failed for $id; response:$response") + } + return response.result == DocWriteResponse.Result.DELETED + } + + /** + * Delete multiple observability objects + * + * @param ids + * @return map of id to delete status + */ + fun deleteObservabilityObjects(ids: Set): Map { + createIndex() + val bulkRequest = BulkRequest() + ids.forEach { + val deleteRequest = DeleteRequest() + .index(INDEX_NAME) + .id(it) + bulkRequest.add(deleteRequest) + } + val actionFuture = client.bulk(bulkRequest) + val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) + val mutableMap = mutableMapOf() + response.forEach { + mutableMap[it.id] = it.status() + if (it.isFailed) { + log.warn("$LOG_PREFIX:deleteObservabilityObjects failed for ${it.id}; response:${it.failureMessage}") + } + } + return mutableMap + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityQueryHelper.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityQueryHelper.kt new file mode 100644 index 000000000..748572a95 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityQueryHelper.kt @@ -0,0 +1,137 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.observability.index + +import org.opensearch.OpenSearchStatusException +import org.opensearch.index.query.BoolQueryBuilder +import org.opensearch.index.query.Operator +import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.QueryBuilders +import org.opensearch.observability.model.ObservabilityObjectType +import org.opensearch.observability.model.RestTag.CREATED_TIME_FIELD +import org.opensearch.observability.model.RestTag.NAME_FIELD +import org.opensearch.observability.model.RestTag.QUERY_FIELD +import org.opensearch.observability.model.RestTag.UPDATED_TIME_FIELD +import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder +import java.util.EnumSet + +/** + * Helper class for Get operations. + */ +internal class ObservabilityQueryHelper(private val types: EnumSet) { + companion object { + private val METADATA_RANGE_FIELDS = setOf( + UPDATED_TIME_FIELD, + CREATED_TIME_FIELD + ) + + // keyword and text fields are under observability object and should be prepended with type + private val KEYWORD_FIELDS: Set = setOf() + private val TEXT_FIELDS = setOf( + NAME_FIELD + ) + + private val METADATA_FIELDS = METADATA_RANGE_FIELDS + private val OBSERVABILITY_OBJECT_FIELDS = KEYWORD_FIELDS.union(TEXT_FIELDS) + private val ALL_FIELDS = METADATA_FIELDS.union(OBSERVABILITY_OBJECT_FIELDS) + + val FILTER_PARAMS = ALL_FIELDS.union(setOf(QUERY_FIELD)) + } + + private val prefixes = if (types.size == 0) { + ObservabilityObjectType.getAll() + } else { + types + } + + fun addSortField(sourceBuilder: SearchSourceBuilder, sortField: String?, sortOrder: SortOrder?) { + val order = sortOrder ?: SortOrder.ASC + if (sortField == null) { + sourceBuilder.sort(UPDATED_TIME_FIELD, order) + } else { + val fields = mutableListOf() + when { + METADATA_RANGE_FIELDS.contains(sortField) -> fields.add(sortField) + KEYWORD_FIELDS.contains(sortField) -> fields.addAll(types.map { "${it.tag}.$sortField" }) + TEXT_FIELDS.contains(sortField) -> fields.addAll(types.map { "${it.tag}.$sortField.keyword" }) + else -> throw OpenSearchStatusException("Field $sortField not acceptable", RestStatus.NOT_ACCEPTABLE) + } + fields.forEach { sourceBuilder.sort(it, order) } + } + } + + fun addTypeFilters(query: BoolQueryBuilder) { + if (types.size > 0) { + types.forEach { + query.should().add(QueryBuilders.existsQuery(it.tag)) + } + query.minimumShouldMatch(1) + } + } + + fun addQueryFilters(query: BoolQueryBuilder, filterParams: Map) { + filterParams.forEach { + when { + QUERY_FIELD == it.key -> query.filter(getQueryAllBuilder(it.value)) // all text search + METADATA_RANGE_FIELDS.contains(it.key) -> query.filter(getRangeQueryBuilder(it.key, it.value)) + KEYWORD_FIELDS.contains(it.key) -> addTermsQueryBuilder(query, it.key, it.value) + TEXT_FIELDS.contains(it.key) -> addMatchQueryBuilder(query, it.key, it.value) + else -> throw OpenSearchStatusException("Query on ${it.key} not acceptable", RestStatus.NOT_ACCEPTABLE) + } + } + } + + private fun getQueryAllBuilder(queryValue: String): QueryBuilder { + val allQuery = QueryBuilders.queryStringQuery(queryValue) + // Searching on metadata field is not supported. skip adding METADATA_FIELDS + OBSERVABILITY_OBJECT_FIELDS.forEach { + prefixes.forEach { type -> allQuery.field("$type.$it") } + } + return allQuery + } + + private fun getRangeQueryBuilder(queryKey: String, queryValue: String): QueryBuilder { + val range = queryValue.split("..") + return when (range.size) { + 1 -> QueryBuilders.termQuery(queryKey, queryValue) + 2 -> { + val rangeQuery = QueryBuilders.rangeQuery(queryKey) + rangeQuery.from(range[0]) + rangeQuery.to(range[1]) + rangeQuery + } + else -> { + throw OpenSearchStatusException( + "Invalid Range format $queryValue, allowed format 'exact' or 'from..to'", + RestStatus.NOT_ACCEPTABLE + ) + } + } + } + + private fun addTermQueryBuilder(query: BoolQueryBuilder, queryKey: String, queryValue: String) { + prefixes.forEach { query.filter(QueryBuilders.termQuery("${it.tag}.$queryKey", queryValue)) } + } + + private fun addTermsQueryBuilder(query: BoolQueryBuilder, queryKey: String, queryValue: String) { + prefixes.forEach { query.filter(QueryBuilders.termsQuery("${it.tag}.$queryKey", queryValue.split(","))) } + } + + private fun addMatchQueryBuilder(query: BoolQueryBuilder, queryKey: String, queryValue: String) { + prefixes.forEach { + query.should().add(QueryBuilders.matchQuery("${it.tag}.$queryKey", queryValue).operator(Operator.AND)) + } + query.minimumShouldMatch(1) + } +} diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/resthandler/ObservabilityRestHandler.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/resthandler/ObservabilityRestHandler.kt new file mode 100644 index 000000000..5a31a3fa9 --- /dev/null +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/resthandler/ObservabilityRestHandler.kt @@ -0,0 +1,251 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ +package org.opensearch.observability.resthandler + +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.utils.logger +import org.opensearch.observability.ObservabilityPlugin.Companion.BASE_OBSERVABILITY_URI +import org.opensearch.observability.ObservabilityPlugin.Companion.LOG_PREFIX +import org.opensearch.observability.action.CreateObservabilityObjectAction +import org.opensearch.observability.action.DeleteObservabilityObjectAction +import org.opensearch.observability.action.GetObservabilityObjectAction +import org.opensearch.observability.action.ObservabilityActions +import org.opensearch.observability.action.UpdateObservabilityObjectAction +import org.opensearch.observability.index.ObservabilityQueryHelper +import org.opensearch.observability.model.CreateObservabilityObjectRequest +import org.opensearch.observability.model.DeleteObservabilityObjectRequest +import org.opensearch.observability.model.GetObservabilityObjectRequest +import org.opensearch.observability.model.ObservabilityObjectType +import org.opensearch.observability.model.RestTag.FROM_INDEX_FIELD +import org.opensearch.observability.model.RestTag.MAX_ITEMS_FIELD +import org.opensearch.observability.model.RestTag.OBJECT_ID_FIELD +import org.opensearch.observability.model.RestTag.OBJECT_ID_LIST_FIELD +import org.opensearch.observability.model.RestTag.OBJECT_TYPE_FIELD +import org.opensearch.observability.model.RestTag.SORT_FIELD_FIELD +import org.opensearch.observability.model.RestTag.SORT_ORDER_FIELD +import org.opensearch.observability.model.UpdateObservabilityObjectRequest +import org.opensearch.observability.settings.PluginSettings +import org.opensearch.observability.util.contentParserNextToken +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestRequest.Method.DELETE +import org.opensearch.rest.RestRequest.Method.GET +import org.opensearch.rest.RestRequest.Method.POST +import org.opensearch.rest.RestRequest.Method.PUT +import org.opensearch.rest.RestStatus +import org.opensearch.search.sort.SortOrder +import java.util.EnumSet + +/** + * Rest handler for observability object lifecycle management. + * This handler uses [ObservabilityActions]. + */ +internal class ObservabilityRestHandler : BaseRestHandler() { + companion object { + private const val OBSERVABILITY_ACTION = "observability_actions" + private const val OBSERVABILITY_URL = "$BASE_OBSERVABILITY_URI/object" + private val log by logger(ObservabilityRestHandler::class.java) + } + + /** + * {@inheritDoc} + */ + override fun getName(): String { + return OBSERVABILITY_ACTION + } + + /** + * {@inheritDoc} + */ + override fun routes(): List { + return listOf( + /** + * Create a new object + * Request URL: POST OBSERVABILITY_URL + * Request body: Ref [org.opensearch.observability.model.CreateObservabilityObjectRequest] + * Response body: Ref [org.opensearch.observability.model.CreateObservabilityObjectResponse] + */ + Route(POST, OBSERVABILITY_URL), + /** + * Update object + * Request URL: PUT OBSERVABILITY_URL/{objectId} + * Request body: Ref [org.opensearch.observability.model.UpdateObservabilityObjectRequest] + * Response body: Ref [org.opensearch.observability.model.UpdateObservabilityObjectResponse] + */ + Route(PUT, "$OBSERVABILITY_URL/{$OBJECT_ID_FIELD}"), + /** + * Get a object + * Request URL: GET OBSERVABILITY_URL/{objectId} + * Request body: Ref [org.opensearch.observability.model.GetObservabilityObjectRequest] + * Response body: Ref [org.opensearch.observability.model.GetObservabilityObjectResponse] + */ + Route(GET, "$OBSERVABILITY_URL/{$OBJECT_ID_FIELD}"), + Route(GET, OBSERVABILITY_URL), + /** + * Delete object + * Request URL: DELETE OBSERVABILITY_URL/{objectId} + * Request body: Ref [org.opensearch.observability.model.DeleteObservabilityObjectRequest] + * Response body: Ref [org.opensearch.observability.model.DeleteObservabilityObjectResponse] + */ + Route(DELETE, "$OBSERVABILITY_URL/{$OBJECT_ID_FIELD}"), + Route(DELETE, "$OBSERVABILITY_URL") + ) + } + + /** + * {@inheritDoc} + */ + override fun responseParams(): Set { + return setOf( + OBJECT_ID_FIELD, + OBJECT_ID_LIST_FIELD, + OBJECT_TYPE_FIELD, + SORT_FIELD_FIELD, + SORT_ORDER_FIELD, + FROM_INDEX_FIELD, + MAX_ITEMS_FIELD + ) + } + + private fun executePostRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + return RestChannelConsumer { + client.execute( + CreateObservabilityObjectAction.ACTION_TYPE, + CreateObservabilityObjectRequest.parse(request.contentParserNextToken()), + RestResponseToXContentListener(it) + ) + } + } + + private fun executePutRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + return RestChannelConsumer { + client.execute( + UpdateObservabilityObjectAction.ACTION_TYPE, + UpdateObservabilityObjectRequest.parse(request.contentParserNextToken(), request.param(OBJECT_ID_FIELD)), + RestResponseToXContentListener(it) + ) + } + } + + private fun executeGetRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val objectId: String? = request.param(OBJECT_ID_FIELD) + val objectIdListString: String? = request.param(OBJECT_ID_LIST_FIELD) + val objectIdList = getObjectIdSet(objectId, objectIdListString) + val types: EnumSet = getTypesSet(request.param(OBJECT_TYPE_FIELD)) + val sortField: String? = request.param(SORT_FIELD_FIELD) + val sortOrderString: String? = request.param(SORT_ORDER_FIELD) + val sortOrder: SortOrder? = if (sortOrderString == null) { + null + } else { + SortOrder.fromString(sortOrderString) + } + val fromIndex = request.param(FROM_INDEX_FIELD)?.toIntOrNull() ?: 0 + val maxItems = request.param(MAX_ITEMS_FIELD)?.toIntOrNull() ?: PluginSettings.defaultItemsQueryCount + val filterParams = request.params() + .filter { ObservabilityQueryHelper.FILTER_PARAMS.contains(it.key) } + .map { Pair(it.key, request.param(it.key)) } + .toMap() + log.info( + "$LOG_PREFIX:executeGetRequest idList:$objectIdList types:$types, from:$fromIndex, maxItems:$maxItems," + + " sortField:$sortField, sortOrder=$sortOrder, filters=$filterParams" + ) + return RestChannelConsumer { + client.execute( + GetObservabilityObjectAction.ACTION_TYPE, + GetObservabilityObjectRequest( + objectIdList, + types, + fromIndex, + maxItems, + sortField, + sortOrder, + filterParams + ), + RestResponseToXContentListener(it) + ) + } + } + + private fun executeDeleteRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val objectId: String? = request.param(OBJECT_ID_FIELD) + val objectIdSet: Set = + request.paramAsStringArray(OBJECT_ID_LIST_FIELD, arrayOf(objectId)) + .filter { s -> !s.isNullOrBlank() } + .toSet() + return RestChannelConsumer { + if (objectIdSet.isEmpty()) { + it.sendResponse( + BytesRestResponse( + RestStatus.BAD_REQUEST, + "either $OBJECT_ID_FIELD or $OBJECT_ID_LIST_FIELD is required" + ) + ) + } else { + client.execute( + DeleteObservabilityObjectAction.ACTION_TYPE, + DeleteObservabilityObjectRequest(objectIdSet), + RestResponseToXContentListener(it) + ) + } + } + } + + /** + * {@inheritDoc} + */ + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + return when (request.method()) { + POST -> executePostRequest(request, client) + PUT -> executePutRequest(request, client) + GET -> executeGetRequest(request, client) + DELETE -> executeDeleteRequest(request, client) + else -> RestChannelConsumer { + it.sendResponse(BytesRestResponse(RestStatus.METHOD_NOT_ALLOWED, "${request.method()} is not allowed")) + } + } + } + + private fun getObjectIdSet(objectId: String?, objectIdList: String?): Set { + var retIds: Set = setOf() + if (objectId != null) { + retIds = setOf(objectId) + } + if (objectIdList != null) { + retIds = objectIdList.split(",").union(retIds) + } + return retIds + } + + private fun getTypesSet(typesString: String?): EnumSet { + var types: EnumSet = EnumSet.noneOf(ObservabilityObjectType::class.java) + typesString?.split(",")?.forEach { types.add(ObservabilityObjectType.fromTagOrDefault(it)) } + return types + } +}