diff --git a/CHANGELOG.md b/CHANGELOG.md index a3a5ecc8097c3..935552c943087 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Supporting Scripted Metric Aggregation when reducing aggregations in InternalValueCount and InternalAvg ([18411](https://github.com/opensearch-project/OpenSearch/pull18411))) - Support `search_after` numeric queries with Approximation Framework ([#18896](https://github.com/opensearch-project/OpenSearch/pull/18896)) - Add skip_list parameter to Numeric Field Mappers (default false) ([#18889](https://github.com/opensearch-project/OpenSearch/pull/18889)) +- Map to proper GRPC status codes and achieve exception handling parity with HTTP APIs([#18925](https://github.com/opensearch-project/OpenSearch/pull/18925)) ### Changed - Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570)) diff --git a/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java b/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java index 5d5eeb41118c8..c795dadf671be 100644 --- a/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java +++ b/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java @@ -76,6 +76,38 @@ public final class ExceptionsHelper { private static final Logger logger = LogManager.getLogger(ExceptionsHelper.class); + /** + * Shared error message constants for consistent error handling across HTTP and gRPC protocols. + * These constants ensure that both REST API and gRPC API return identical error messages + * for the same types of exceptions. + */ + public static final class ErrorMessages { + /** Error message for invalid argument exceptions */ + public static final String INVALID_ARGUMENT = "Invalid argument"; + + /** Error message for JSON parsing failures */ + public static final String JSON_PARSE_FAILED = "Failed to parse JSON"; + + /** Error message for rate limiting / rejected execution */ + public static final String TOO_MANY_REQUESTS = "Too many requests"; + + /** Error message for JSON type coercion failures */ + public static final String JSON_COERCION_FAILED = "Incompatible JSON value"; + + /** Error message for content format issues */ + public static final String INVALID_CONTENT_FORMAT = "Invalid content format"; + + /** Error message for compression format issues */ + public static final String INVALID_COMPRESSION_FORMAT = "Invalid compression format"; + + /** Generic fallback error message for unknown exceptions */ + public static final String INTERNAL_FAILURE = "Internal failure"; + + private ErrorMessages() { + // Utility class, no instances + } + } + // utility class: no ctor private ExceptionsHelper() {} @@ -117,16 +149,16 @@ public static String summaryMessage(Throwable t) { if (t instanceof OpenSearchException) { return getExceptionSimpleClassName(t) + "[" + t.getMessage() + "]"; } else if (t instanceof IllegalArgumentException) { - return "Invalid argument"; + return ErrorMessages.INVALID_ARGUMENT; } else if (t instanceof InputCoercionException) { - return "Incompatible JSON value"; + return ErrorMessages.JSON_COERCION_FAILED; } else if (t instanceof JsonParseException) { - return "Failed to parse JSON"; + return ErrorMessages.JSON_PARSE_FAILED; } else if (t instanceof OpenSearchRejectedExecutionException) { - return "Too many requests"; + return ErrorMessages.TOO_MANY_REQUESTS; } } - return "Internal failure"; + return ErrorMessages.INTERNAL_FAILURE; } public static Throwable unwrapCause(Throwable t) { @@ -149,6 +181,25 @@ public static Throwable unwrapCause(Throwable t) { return result; } + /** + * Unwraps exception causes up to 10 levels looking for the first OpenSearchException. + * This method is used by both HTTP and gRPC error handling to ensure consistent exception + * unwrapping behavior across protocols. + * + * @param e The exception to unwrap + * @return The first OpenSearchException found in the cause chain, or the original exception if none found + */ + public static Throwable unwrapToOpenSearchException(Throwable e) { + Throwable t = e; + for (int counter = 0; counter < 10 && t != null; counter++) { + if (t instanceof OpenSearchException) { + break; + } + t = t.getCause(); + } + return t != null ? t : e; + } + /** * @deprecated Don't swallow exceptions, allow them to propagate. */ diff --git a/libs/core/src/main/java/org/opensearch/OpenSearchException.java b/libs/core/src/main/java/org/opensearch/OpenSearchException.java index 8f1f5c929d865..8fb343630338b 100644 --- a/libs/core/src/main/java/org/opensearch/OpenSearchException.java +++ b/libs/core/src/main/java/org/opensearch/OpenSearchException.java @@ -628,14 +628,8 @@ public static void generateFailureXContent(XContentBuilder builder, ToXContent.P // Render the exception with a simple message if (detailed == false) { - Throwable t = e; - for (int counter = 0; counter < 10 && t != null; counter++) { - if (t instanceof OpenSearchException) { - break; - } - t = t.getCause(); - } - builder.field(ERROR, ExceptionsHelper.summaryMessage(t != null ? t : e)); + Throwable unwrapped = ExceptionsHelper.unwrapToOpenSearchException(e); + builder.field(ERROR, ExceptionsHelper.summaryMessage(unwrapped)); return; } diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java index aacf5f52e534f..6e1f6306c2aae 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/BulkRequestActionListener.java @@ -13,9 +13,11 @@ import org.opensearch.action.bulk.BulkResponse; import org.opensearch.core.action.ActionListener; import org.opensearch.transport.grpc.proto.response.document.bulk.BulkResponseProtoUtils; +import org.opensearch.transport.grpc.util.GrpcErrorHandler; import java.io.IOException; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; /** @@ -49,7 +51,9 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) { responseObserver.onNext(protoResponse); responseObserver.onCompleted(); } catch (RuntimeException | IOException e) { - responseObserver.onError(e); + logger.error("Failed to convert bulk response to protobuf: " + e.getMessage()); + StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e); + responseObserver.onError(grpcError); } } @@ -61,7 +65,8 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) { */ @Override public void onFailure(Exception e) { - logger.error("BulkRequestActionListener failed to process bulk request:" + e.getMessage()); - responseObserver.onError(e); + logger.error("BulkRequestActionListener failed to process bulk request: " + e.getMessage()); + StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e); + responseObserver.onError(grpcError); } } diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/SearchRequestActionListener.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/SearchRequestActionListener.java index 306c57d91c480..7712d85a3a217 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/SearchRequestActionListener.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/listeners/SearchRequestActionListener.java @@ -13,9 +13,11 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.core.action.ActionListener; import org.opensearch.transport.grpc.proto.response.search.SearchResponseProtoUtils; +import org.opensearch.transport.grpc.util.GrpcErrorHandler; import java.io.IOException; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; /** @@ -44,13 +46,16 @@ public void onResponse(SearchResponse response) { responseObserver.onNext(protoResponse); responseObserver.onCompleted(); } catch (RuntimeException | IOException e) { - responseObserver.onError(e); + logger.error("Failed to convert search response to protobuf: " + e.getMessage()); + StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e); + responseObserver.onError(grpcError); } } @Override public void onFailure(Exception e) { - logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage()); - responseObserver.onError(e); + logger.error("SearchRequestActionListener failed to process search request: " + e.getMessage()); + StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e); + responseObserver.onError(grpcError); } } diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/document/bulk/BulkItemResponseProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/document/bulk/BulkItemResponseProtoUtils.java index 000dfdd1de22f..5dcdeebd96631 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/document/bulk/BulkItemResponseProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/response/document/bulk/BulkItemResponseProtoUtils.java @@ -20,6 +20,7 @@ import org.opensearch.transport.grpc.proto.response.document.common.DocWriteResponseProtoUtils; import org.opensearch.transport.grpc.proto.response.document.get.GetResultProtoUtils; import org.opensearch.transport.grpc.proto.response.exceptions.opensearchexception.OpenSearchExceptionProtoUtils; +import org.opensearch.transport.grpc.util.RestToGrpcStatusConverter; import java.io.IOException; @@ -52,8 +53,8 @@ public static Item toProto(BulkItemResponse response) throws IOException { DocWriteResponse docResponse = response.getResponse(); responseItemBuilder = DocWriteResponseProtoUtils.toProto(docResponse); - // TODO set the GRPC status instead of HTTP Status - responseItemBuilder.setStatus(docResponse.status().getStatus()); + int grpcStatusCode = RestToGrpcStatusConverter.getGrpcStatusCode(docResponse.status()); + responseItemBuilder.setStatus(grpcStatusCode); } else { BulkItemResponse.Failure failure = response.getFailure(); responseItemBuilder = ResponseItem.newBuilder(); @@ -64,8 +65,8 @@ public static Item toProto(BulkItemResponse response) throws IOException { } else { responseItemBuilder.setId(ResponseItem.Id.newBuilder().setString(response.getId()).build()); } - // TODO set the GRPC status instead of HTTP Status - responseItemBuilder.setStatus(failure.getStatus().getStatus()); + int grpcStatusCode = RestToGrpcStatusConverter.getGrpcStatusCode(failure.getStatus()); + responseItemBuilder.setStatus(grpcStatusCode); ErrorCause errorCause = OpenSearchExceptionProtoUtils.generateThrowableProto(failure.getCause()); diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java index 78ead3d818dcd..873bad5e69e4f 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/services/DocumentServiceImpl.java @@ -14,7 +14,9 @@ import org.opensearch.transport.client.Client; import org.opensearch.transport.grpc.listeners.BulkRequestActionListener; import org.opensearch.transport.grpc.proto.request.document.bulk.BulkRequestProtoUtils; +import org.opensearch.transport.grpc.util.GrpcErrorHandler; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; /** @@ -47,7 +49,8 @@ public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver TOO_MANY_REQUESTS -> + // RESOURCE_EXHAUSTED + assertTrue(result.getMessage().contains("CircuitBreakingException[Memory circuit breaker]")); + } + + public void testSearchPhaseExecutionExceptionInCleanMessage() { + SearchPhaseExecutionException exception = new SearchPhaseExecutionException( + "query", + "Search failed", + new org.opensearch.action.search.ShardSearchFailure[0] + ); + + StatusRuntimeException result = GrpcErrorHandler.convertToGrpcError(exception); + + // SearchPhaseExecutionException with empty shardFailures -> SERVICE_UNAVAILABLE -> UNAVAILABLE + assertEquals(Status.UNAVAILABLE.getCode(), result.getStatus().getCode()); + assertTrue(result.getMessage().contains("SearchPhaseExecutionException[Search failed]")); + } +} diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/RestToGrpcStatusConverterTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/RestToGrpcStatusConverterTests.java new file mode 100644 index 0000000000000..b4dc70f707d7a --- /dev/null +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/util/RestToGrpcStatusConverterTests.java @@ -0,0 +1,103 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport.grpc.util; + +import org.opensearch.core.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +import io.grpc.Status; + +/** + * Tests for RestToGrpcStatusConverter. + * Validates that REST status codes are properly mapped to GRPC status codes. + */ +public class RestToGrpcStatusConverterTests extends OpenSearchTestCase { + + public void testSuccessStatusConversion() { + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.OK)); + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.CREATED)); + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.ACCEPTED)); + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.NO_CONTENT)); + } + + public void testClientErrorConversion() { + assertEquals(Status.INVALID_ARGUMENT, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.BAD_REQUEST)); + assertEquals(Status.PERMISSION_DENIED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.UNAUTHORIZED)); + assertEquals(Status.PERMISSION_DENIED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.FORBIDDEN)); + assertEquals(Status.NOT_FOUND, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.NOT_FOUND)); + assertEquals(Status.UNIMPLEMENTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.METHOD_NOT_ALLOWED)); + assertEquals(Status.ABORTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.CONFLICT)); + assertEquals(Status.FAILED_PRECONDITION, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.PRECONDITION_FAILED)); + assertEquals(Status.RESOURCE_EXHAUSTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.TOO_MANY_REQUESTS)); + assertEquals(Status.DEADLINE_EXCEEDED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.REQUEST_TIMEOUT)); + } + + public void testServerErrorConversion() { + assertEquals(Status.INTERNAL, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.INTERNAL_SERVER_ERROR)); + assertEquals(Status.UNIMPLEMENTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.NOT_IMPLEMENTED)); + assertEquals(Status.UNAVAILABLE, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.BAD_GATEWAY)); + assertEquals(Status.UNAVAILABLE, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.SERVICE_UNAVAILABLE)); + assertEquals(Status.DEADLINE_EXCEEDED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.GATEWAY_TIMEOUT)); + assertEquals(Status.RESOURCE_EXHAUSTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.INSUFFICIENT_STORAGE)); + } + + public void testGrpcStatusCodeValues() { + // Test that our getGrpcStatusCode method returns correct numeric values + assertEquals(Status.OK.getCode().value(), RestToGrpcStatusConverter.getGrpcStatusCode(RestStatus.OK)); + assertEquals(Status.INVALID_ARGUMENT.getCode().value(), RestToGrpcStatusConverter.getGrpcStatusCode(RestStatus.BAD_REQUEST)); + assertEquals(Status.NOT_FOUND.getCode().value(), RestToGrpcStatusConverter.getGrpcStatusCode(RestStatus.NOT_FOUND)); + assertEquals(Status.PERMISSION_DENIED.getCode().value(), RestToGrpcStatusConverter.getGrpcStatusCode(RestStatus.FORBIDDEN)); + assertEquals( + Status.RESOURCE_EXHAUSTED.getCode().value(), + RestToGrpcStatusConverter.getGrpcStatusCode(RestStatus.TOO_MANY_REQUESTS) + ); + assertEquals(Status.INTERNAL.getCode().value(), RestToGrpcStatusConverter.getGrpcStatusCode(RestStatus.INTERNAL_SERVER_ERROR)); + assertEquals(Status.UNAVAILABLE.getCode().value(), RestToGrpcStatusConverter.getGrpcStatusCode(RestStatus.SERVICE_UNAVAILABLE)); + } + + public void testAdditionalStatusConversion() { + // 1xx Informational - now properly mapped + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.CONTINUE)); + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.SWITCHING_PROTOCOLS)); + + // 2xx Success (additional codes) + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.NON_AUTHORITATIVE_INFORMATION)); + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.RESET_CONTENT)); + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.PARTIAL_CONTENT)); + assertEquals(Status.OK, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.MULTI_STATUS)); + + // 3xx Redirects - now properly mapped + assertEquals(Status.FAILED_PRECONDITION, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.MULTIPLE_CHOICES)); + assertEquals(Status.FAILED_PRECONDITION, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.MOVED_PERMANENTLY)); + assertEquals(Status.FAILED_PRECONDITION, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.FOUND)); + assertEquals(Status.FAILED_PRECONDITION, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.NOT_MODIFIED)); + + // 4xx Additional client errors + assertEquals(Status.PERMISSION_DENIED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.PAYMENT_REQUIRED)); + assertEquals(Status.NOT_FOUND, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.GONE)); + assertEquals(Status.UNAUTHENTICATED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.PROXY_AUTHENTICATION)); + assertEquals(Status.FAILED_PRECONDITION, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.LENGTH_REQUIRED)); + assertEquals(Status.OUT_OF_RANGE, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.REQUEST_ENTITY_TOO_LARGE)); + assertEquals(Status.OUT_OF_RANGE, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.REQUESTED_RANGE_NOT_SATISFIED)); + assertEquals(Status.INVALID_ARGUMENT, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.UNPROCESSABLE_ENTITY)); + assertEquals(Status.FAILED_PRECONDITION, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.LOCKED)); + + // 5xx Additional server errors + assertEquals(Status.UNIMPLEMENTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.HTTP_VERSION_NOT_SUPPORTED)); + } + + public void testCommonOpenSearchErrorMappings() { + // Test mappings for common OpenSearch error scenarios + assertEquals(Status.INVALID_ARGUMENT, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.BAD_REQUEST)); + assertEquals(Status.NOT_FOUND, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.NOT_FOUND)); + assertEquals(Status.ABORTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.CONFLICT)); + assertEquals(Status.RESOURCE_EXHAUSTED, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.TOO_MANY_REQUESTS)); + assertEquals(Status.UNAVAILABLE, RestToGrpcStatusConverter.convertRestToGrpcStatus(RestStatus.SERVICE_UNAVAILABLE)); + } +} diff --git a/server/src/test/java/org/opensearch/ExceptionsHelperTests.java b/server/src/test/java/org/opensearch/ExceptionsHelperTests.java index 20527eb4e7dc0..9bf993832e4b1 100644 --- a/server/src/test/java/org/opensearch/ExceptionsHelperTests.java +++ b/server/src/test/java/org/opensearch/ExceptionsHelperTests.java @@ -264,4 +264,28 @@ public void testCauseCycle() { ExceptionsHelper.unwrap(e1, IOException.class); ExceptionsHelper.unwrapCorruption(e1); } + + public void testUnwrapToOpenSearchException() { + // Test with OpenSearchException directly - should return the same exception + OpenSearchException directException = new OpenSearchException("direct error"); + assertSame(directException, ExceptionsHelper.unwrapToOpenSearchException(directException)); + + // Test with nested OpenSearchException - should unwrap to it + OpenSearchException nestedOpenSearchException = new OpenSearchException("nested error"); + RuntimeException wrapper = new RuntimeException("wrapper", nestedOpenSearchException); + assertSame(nestedOpenSearchException, ExceptionsHelper.unwrapToOpenSearchException(wrapper)); + + // Test with non-OpenSearchException - should return original + IllegalArgumentException nonOpenSearchException = new IllegalArgumentException("not opensearch"); + assertSame(nonOpenSearchException, ExceptionsHelper.unwrapToOpenSearchException(nonOpenSearchException)); + + // Test with multiple levels of nesting + OpenSearchException deepNested = new OpenSearchException("deep error"); + RuntimeException level1 = new RuntimeException("level 1", deepNested); + RuntimeException level2 = new RuntimeException("level 2", level1); + assertSame(deepNested, ExceptionsHelper.unwrapToOpenSearchException(level2)); + + // Test with null - should return null + assertThat(ExceptionsHelper.unwrapToOpenSearchException(null), nullValue()); + } }