Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Supporting Scripted Metric Aggregation when reducing aggregations in InternalValueCount and InternalAvg ([18411](https://github.com/opensearch-project/OpenSearch/pull18411)))
- Support `search_after` numeric queries with Approximation Framework ([#18896](https://github.com/opensearch-project/OpenSearch/pull/18896))
- Add skip_list parameter to Numeric Field Mappers (default false) ([#18889](https://github.com/opensearch-project/OpenSearch/pull/18889))
- Map to proper GRPC status codes and achieve exception handling parity with HTTP APIs([#18925](https://github.com/opensearch-project/OpenSearch/pull/18925))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
61 changes: 56 additions & 5 deletions libs/core/src/main/java/org/opensearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@
public final class ExceptionsHelper {
private static final Logger logger = LogManager.getLogger(ExceptionsHelper.class);

/**
* Shared error message constants for consistent error handling across HTTP and gRPC protocols.
* These constants ensure that both REST API and gRPC API return identical error messages
* for the same types of exceptions.
*/
public static final class ErrorMessages {
/** Error message for invalid argument exceptions */
public static final String INVALID_ARGUMENT = "Invalid argument";

/** Error message for JSON parsing failures */
public static final String JSON_PARSE_FAILED = "Failed to parse JSON";

/** Error message for rate limiting / rejected execution */
public static final String TOO_MANY_REQUESTS = "Too many requests";

/** Error message for JSON type coercion failures */
public static final String JSON_COERCION_FAILED = "Incompatible JSON value";

/** Error message for content format issues */
public static final String INVALID_CONTENT_FORMAT = "Invalid content format";

/** Error message for compression format issues */
public static final String INVALID_COMPRESSION_FORMAT = "Invalid compression format";

/** Generic fallback error message for unknown exceptions */
public static final String INTERNAL_FAILURE = "Internal failure";

private ErrorMessages() {
// Utility class, no instances
}
}

// utility class: no ctor
private ExceptionsHelper() {}

Expand Down Expand Up @@ -117,16 +149,16 @@ public static String summaryMessage(Throwable t) {
if (t instanceof OpenSearchException) {
return getExceptionSimpleClassName(t) + "[" + t.getMessage() + "]";
} else if (t instanceof IllegalArgumentException) {
return "Invalid argument";
return ErrorMessages.INVALID_ARGUMENT;
} else if (t instanceof InputCoercionException) {
return "Incompatible JSON value";
return ErrorMessages.JSON_COERCION_FAILED;
} else if (t instanceof JsonParseException) {
return "Failed to parse JSON";
return ErrorMessages.JSON_PARSE_FAILED;
} else if (t instanceof OpenSearchRejectedExecutionException) {
return "Too many requests";
return ErrorMessages.TOO_MANY_REQUESTS;
}
}
return "Internal failure";
return ErrorMessages.INTERNAL_FAILURE;
}

public static Throwable unwrapCause(Throwable t) {
Expand All @@ -149,6 +181,25 @@ public static Throwable unwrapCause(Throwable t) {
return result;
}

/**
* Unwraps exception causes up to 10 levels looking for the first OpenSearchException.
* This method is used by both HTTP and gRPC error handling to ensure consistent exception
* unwrapping behavior across protocols.
*
* @param e The exception to unwrap
* @return The first OpenSearchException found in the cause chain, or the original exception if none found
*/
public static Throwable unwrapToOpenSearchException(Throwable e) {
Throwable t = e;
for (int counter = 0; counter < 10 && t != null; counter++) {
if (t instanceof OpenSearchException) {
break;
}
t = t.getCause();
}
return t != null ? t : e;
}

/**
* @deprecated Don't swallow exceptions, allow them to propagate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,14 +628,8 @@ public static void generateFailureXContent(XContentBuilder builder, ToXContent.P

// Render the exception with a simple message
if (detailed == false) {
Throwable t = e;
for (int counter = 0; counter < 10 && t != null; counter++) {
if (t instanceof OpenSearchException) {
break;
}
t = t.getCause();
}
builder.field(ERROR, ExceptionsHelper.summaryMessage(t != null ? t : e));
Throwable unwrapped = ExceptionsHelper.unwrapToOpenSearchException(e);
builder.field(ERROR, ExceptionsHelper.summaryMessage(unwrapped));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.transport.grpc.proto.response.document.bulk.BulkResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -49,7 +51,9 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) {
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (RuntimeException | IOException e) {
responseObserver.onError(e);
logger.error("Failed to convert bulk response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

Expand All @@ -61,7 +65,8 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) {
*/
@Override
public void onFailure(Exception e) {
logger.error("BulkRequestActionListener failed to process bulk request:" + e.getMessage());
responseObserver.onError(e);
logger.error("BulkRequestActionListener failed to process bulk request: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.transport.grpc.proto.response.search.SearchResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -44,13 +46,16 @@ public void onResponse(SearchResponse response) {
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (RuntimeException | IOException e) {
responseObserver.onError(e);
logger.error("Failed to convert search response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

@Override
public void onFailure(Exception e) {
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
responseObserver.onError(e);
logger.error("SearchRequestActionListener failed to process search request: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.transport.grpc.proto.response.document.common.DocWriteResponseProtoUtils;
import org.opensearch.transport.grpc.proto.response.document.get.GetResultProtoUtils;
import org.opensearch.transport.grpc.proto.response.exceptions.opensearchexception.OpenSearchExceptionProtoUtils;
import org.opensearch.transport.grpc.util.RestToGrpcStatusConverter;

import java.io.IOException;

Expand Down Expand Up @@ -52,8 +53,8 @@ public static Item toProto(BulkItemResponse response) throws IOException {
DocWriteResponse docResponse = response.getResponse();
responseItemBuilder = DocWriteResponseProtoUtils.toProto(docResponse);

// TODO set the GRPC status instead of HTTP Status
responseItemBuilder.setStatus(docResponse.status().getStatus());
int grpcStatusCode = RestToGrpcStatusConverter.getGrpcStatusCode(docResponse.status());
responseItemBuilder.setStatus(grpcStatusCode);
} else {
BulkItemResponse.Failure failure = response.getFailure();
responseItemBuilder = ResponseItem.newBuilder();
Expand All @@ -64,8 +65,8 @@ public static Item toProto(BulkItemResponse response) throws IOException {
} else {
responseItemBuilder.setId(ResponseItem.Id.newBuilder().setString(response.getId()).build());
}
// TODO set the GRPC status instead of HTTP Status
responseItemBuilder.setStatus(failure.getStatus().getStatus());
int grpcStatusCode = RestToGrpcStatusConverter.getGrpcStatusCode(failure.getStatus());
responseItemBuilder.setStatus(grpcStatusCode);

ErrorCause errorCause = OpenSearchExceptionProtoUtils.generateThrowableProto(failure.getCause());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.opensearch.transport.client.Client;
import org.opensearch.transport.grpc.listeners.BulkRequestActionListener;
import org.opensearch.transport.grpc.proto.request.document.bulk.BulkRequestProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -47,7 +49,8 @@ public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver<or
client.bulk(bulkRequest, listener);
} catch (RuntimeException e) {
logger.error("DocumentServiceImpl failed to process bulk request, request=" + request + ", error=" + e.getMessage());
responseObserver.onError(e);
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import org.opensearch.transport.grpc.listeners.SearchRequestActionListener;
import org.opensearch.transport.grpc.proto.request.search.SearchRequestProtoUtils;
import org.opensearch.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -66,7 +68,8 @@ public void search(
client.search(searchRequest, listener);
} catch (RuntimeException | IOException e) {
logger.error("SearchServiceImpl failed to process search request, request=" + request + ", error=" + e.getMessage());
responseObserver.onError(e);
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.util;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.exc.InputCoercionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.core.compress.NotCompressedException;
import org.opensearch.core.compress.NotXContentException;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;

/**
* Converts exceptions to a GRPC StatusRuntimeException.
*/
public class GrpcErrorHandler {
private static final Logger logger = LogManager.getLogger(GrpcErrorHandler.class);

private GrpcErrorHandler() {
// Utility class, no instances
}

/**
* Converts an exception to an appropriate GRPC StatusRuntimeException.
* Uses shared constants from {@link ExceptionsHelper.ErrorMessages} and {@link ExceptionsHelper#summaryMessage}
* for exact parity with HTTP error handling.
*
* @param e The exception to convert
* @return StatusRuntimeException with appropriate GRPC status and HTTP-identical error messages
*/
public static StatusRuntimeException convertToGrpcError(Exception e) {
// ========== OpenSearch Business Logic Exceptions ==========
// Custom OpenSearch exceptions which extend {@link OpenSearchException}.
// Uses {@link RestToGrpcStatusConverter} for REST -> gRPC status mapping and
// follows {@link OpenSearchException#generateFailureXContent} unwrapping logic
if (e instanceof OpenSearchException) {
return handleOpenSearchException((OpenSearchException) e);
}

// ========== OpenSearch Core System Exceptions ==========
// Low-level OpenSearch exceptions that don't extend OpenSearchException - include full details
else if (e instanceof OpenSearchRejectedExecutionException) {
return Status.RESOURCE_EXHAUSTED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof NotXContentException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof NotCompressedException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}

// ========== 3. Third-party Library Exceptions ==========
// External library exceptions (Jackson JSON parsing) - include full details
else if (e instanceof InputCoercionException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof JsonParseException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}

// ========== 4. Standard Java Exceptions ==========
// Generic Java runtime exceptions - include full exception details for debugging
else if (e instanceof IllegalArgumentException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof IllegalStateException) {
return Status.FAILED_PRECONDITION.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof SecurityException) {
return Status.PERMISSION_DENIED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof TimeoutException) {
return Status.DEADLINE_EXCEEDED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof InterruptedException) {
return Status.CANCELLED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof IOException) {
return Status.INTERNAL.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}

// ========== 5. Unknown/Unmapped Exceptions ==========
// Safety fallback for any unexpected exception to {@code Status.INTERNAL} with full debugging info
else {
logger.warn("Unmapped exception type: {}, treating as INTERNAL error", e.getClass().getSimpleName());
return Status.INTERNAL.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}
}

/**
* Handles OpenSearch-specific exceptions by converting their HTTP status to GRPC status.
* Uses {@link ExceptionsHelper#summaryMessage(Throwable)} for exact parity with HTTP error handling.
*
* Uses {@link ExceptionsHelper#unwrapToOpenSearchException(Throwable)} for shared unwrapping logic
* with HTTP's {@link OpenSearchException#generateFailureXContent}.
*
* @param e The {@link OpenSearchException} to convert
* @return StatusRuntimeException with mapped GRPC status and HTTP-identical error message
*/
private static StatusRuntimeException handleOpenSearchException(OpenSearchException e) {
Status grpcStatus = RestToGrpcStatusConverter.convertRestToGrpcStatus(e.status());

Throwable unwrapped = ExceptionsHelper.unwrapToOpenSearchException(e);

String description = ExceptionsHelper.summaryMessage(unwrapped);
return grpcStatus.withDescription(description).asRuntimeException();
}

}
Loading
Loading