Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
- Map to proper GRPC status codes and achieve exception handling parity with HTTP APIs([#18925](https://github.com/opensearch-project/OpenSearch/pull/18925))

### Changed
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))
Expand Down
61 changes: 56 additions & 5 deletions libs/core/src/main/java/org/opensearch/ExceptionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@
public final class ExceptionsHelper {
private static final Logger logger = LogManager.getLogger(ExceptionsHelper.class);

/**
* Shared error message constants for consistent error handling across HTTP and gRPC protocols.
* These constants ensure that both REST API and gRPC API return identical error messages
* for the same types of exceptions.
*/
public static class ErrorMessages {
/** Error message for invalid argument exceptions */
public static final String INVALID_ARGUMENT = "Invalid argument";

/** Error message for JSON parsing failures */
public static final String JSON_PARSE_FAILED = "Failed to parse JSON";

/** Error message for rate limiting / rejected execution */
public static final String TOO_MANY_REQUESTS = "Too many requests";

/** Error message for JSON type coercion failures */
public static final String JSON_COERCION_FAILED = "Incompatible JSON value";

/** Error message for content format issues */
public static final String INVALID_CONTENT_FORMAT = "Invalid content format";

/** Error message for compression format issues */
public static final String INVALID_COMPRESSION_FORMAT = "Invalid compression format";

/** Generic fallback error message for unknown exceptions */
public static final String INTERNAL_FAILURE = "Internal failure";

private ErrorMessages() {
// Utility class, no instances
}
}

// utility class: no ctor
private ExceptionsHelper() {}

Expand Down Expand Up @@ -117,16 +149,16 @@ public static String summaryMessage(Throwable t) {
if (t instanceof OpenSearchException) {
return getExceptionSimpleClassName(t) + "[" + t.getMessage() + "]";
} else if (t instanceof IllegalArgumentException) {
return "Invalid argument";
return ErrorMessages.INVALID_ARGUMENT;
} else if (t instanceof InputCoercionException) {
return "Incompatible JSON value";
return ErrorMessages.JSON_COERCION_FAILED;
} else if (t instanceof JsonParseException) {
return "Failed to parse JSON";
return ErrorMessages.JSON_PARSE_FAILED;
} else if (t instanceof OpenSearchRejectedExecutionException) {
return "Too many requests";
return ErrorMessages.TOO_MANY_REQUESTS;
}
}
return "Internal failure";
return ErrorMessages.INTERNAL_FAILURE;
}

public static Throwable unwrapCause(Throwable t) {
Expand All @@ -149,6 +181,25 @@ public static Throwable unwrapCause(Throwable t) {
return result;
}

/**
* Unwraps exception causes up to 10 levels looking for the first OpenSearchException.
* This method is used by both HTTP and gRPC error handling to ensure consistent exception
* unwrapping behavior across protocols.
*
* @param e The exception to unwrap
* @return The first OpenSearchException found in the cause chain, or the original exception if none found
*/
public static Throwable unwrapToOpenSearchException(Throwable e) {
Throwable t = e;
for (int counter = 0; counter < 10 && t != null; counter++) {
if (t instanceof OpenSearchException) {
break;
}
t = t.getCause();
}
return t != null ? t : e;
}

/**
* @deprecated Don't swallow exceptions, allow them to propagate.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,14 +628,8 @@ public static void generateFailureXContent(XContentBuilder builder, ToXContent.P

// Render the exception with a simple message
if (detailed == false) {
Throwable t = e;
for (int counter = 0; counter < 10 && t != null; counter++) {
if (t instanceof OpenSearchException) {
break;
}
t = t.getCause();
}
builder.field(ERROR, ExceptionsHelper.summaryMessage(t != null ? t : e));
Throwable unwrapped = ExceptionsHelper.unwrapToOpenSearchException(e);
builder.field(ERROR, ExceptionsHelper.summaryMessage(unwrapped));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.transport.grpc.proto.response.document.bulk.BulkResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -49,7 +51,9 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) {
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (RuntimeException | IOException e) {
responseObserver.onError(e);
logger.error("Failed to convert bulk response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

Expand All @@ -61,7 +65,8 @@ public void onResponse(org.opensearch.action.bulk.BulkResponse response) {
*/
@Override
public void onFailure(Exception e) {
logger.error("BulkRequestActionListener failed to process bulk request:" + e.getMessage());
responseObserver.onError(e);
logger.error("BulkRequestActionListener failed to process bulk request: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.transport.grpc.proto.response.search.SearchResponseProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -44,13 +46,16 @@ public void onResponse(SearchResponse response) {
responseObserver.onNext(protoResponse);
responseObserver.onCompleted();
} catch (RuntimeException | IOException e) {
responseObserver.onError(e);
logger.error("Failed to convert search response to protobuf: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}

@Override
public void onFailure(Exception e) {
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
responseObserver.onError(e);
logger.error("SearchRequestActionListener failed to process search request: " + e.getMessage());
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.transport.grpc.proto.response.document.common.DocWriteResponseProtoUtils;
import org.opensearch.transport.grpc.proto.response.document.get.GetResultProtoUtils;
import org.opensearch.transport.grpc.proto.response.exceptions.opensearchexception.OpenSearchExceptionProtoUtils;
import org.opensearch.transport.grpc.util.HttpToGrpcStatusConverter;

import java.io.IOException;

Expand Down Expand Up @@ -52,8 +53,8 @@ public static Item toProto(BulkItemResponse response) throws IOException {
DocWriteResponse docResponse = response.getResponse();
responseItemBuilder = DocWriteResponseProtoUtils.toProto(docResponse);

// TODO set the GRPC status instead of HTTP Status
responseItemBuilder.setStatus(docResponse.status().getStatus());
int grpcStatusCode = HttpToGrpcStatusConverter.getGrpcStatusCode(docResponse.status());
responseItemBuilder.setStatus(grpcStatusCode);
} else {
BulkItemResponse.Failure failure = response.getFailure();
responseItemBuilder = ResponseItem.newBuilder();
Expand All @@ -64,8 +65,8 @@ public static Item toProto(BulkItemResponse response) throws IOException {
} else {
responseItemBuilder.setId(ResponseItem.Id.newBuilder().setString(response.getId()).build());
}
// TODO set the GRPC status instead of HTTP Status
responseItemBuilder.setStatus(failure.getStatus().getStatus());
int grpcStatusCode = HttpToGrpcStatusConverter.getGrpcStatusCode(failure.getStatus());
responseItemBuilder.setStatus(grpcStatusCode);

ErrorCause errorCause = OpenSearchExceptionProtoUtils.generateThrowableProto(failure.getCause());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.opensearch.transport.client.Client;
import org.opensearch.transport.grpc.listeners.BulkRequestActionListener;
import org.opensearch.transport.grpc.proto.request.document.bulk.BulkRequestProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -47,7 +49,8 @@ public void bulk(org.opensearch.protobufs.BulkRequest request, StreamObserver<or
client.bulk(bulkRequest, listener);
} catch (RuntimeException e) {
logger.error("DocumentServiceImpl failed to process bulk request, request=" + request + ", error=" + e.getMessage());
responseObserver.onError(e);
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import org.opensearch.transport.grpc.listeners.SearchRequestActionListener;
import org.opensearch.transport.grpc.proto.request.search.SearchRequestProtoUtils;
import org.opensearch.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
import org.opensearch.transport.grpc.util.GrpcErrorHandler;

import java.io.IOException;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

/**
Expand Down Expand Up @@ -66,7 +68,8 @@ public void search(
client.search(searchRequest, listener);
} catch (RuntimeException | IOException e) {
logger.error("SearchServiceImpl failed to process search request, request=" + request + ", error=" + e.getMessage());
responseObserver.onError(e);
StatusRuntimeException grpcError = GrpcErrorHandler.convertToGrpcError(e);
responseObserver.onError(grpcError);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.util;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.exc.InputCoercionException;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.core.compress.NotCompressedException;
import org.opensearch.core.compress.NotXContentException;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import io.grpc.Status;
import io.grpc.StatusRuntimeException;

/**
* Converts exceptions to a GRPC StatusRuntimeException.
*/
public class GrpcErrorHandler {
private static final Logger logger = LogManager.getLogger(GrpcErrorHandler.class);

private GrpcErrorHandler() {
// Utility class, no instances
}

/**
* Converts an exception to an appropriate GRPC StatusRuntimeException.
* Uses shared constants from {@link ExceptionsHelper.ErrorMessages} and {@link ExceptionsHelper#summaryMessage}
* for exact parity with HTTP error handling.
*
* @param e The exception to convert
* @return StatusRuntimeException with appropriate GRPC status and HTTP-identical error messages
*/
public static StatusRuntimeException convertToGrpcError(Exception e) {
// ========== OpenSearch Business Logic Exceptions ==========
// Custom OpenSearch exceptions which extend {@link OpenSearchException}.
// Uses {@link HttpToGrpcStatusConverter} for HTTP -> gRPC status mapping and
// follows {@link OpenSearchException#generateFailureXContent} unwrapping logic
if (e instanceof OpenSearchException) {
return handleOpenSearchException((OpenSearchException) e);
}

// ========== OpenSearch Core System Exceptions ==========
// Low-level OpenSearch exceptions that don't extend OpenSearchException - include full details
else if (e instanceof OpenSearchRejectedExecutionException) {
return Status.RESOURCE_EXHAUSTED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof NotXContentException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof NotCompressedException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}

// ========== 3. Third-party Library Exceptions ==========
// External library exceptions (Jackson JSON parsing) - include full details
else if (e instanceof InputCoercionException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof JsonParseException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}

// ========== 4. Standard Java Exceptions ==========
// Generic Java runtime exceptions - include full exception details for debugging
else if (e instanceof IllegalArgumentException) {
return Status.INVALID_ARGUMENT.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof IllegalStateException) {
return Status.FAILED_PRECONDITION.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof SecurityException) {
return Status.PERMISSION_DENIED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof TimeoutException) {
return Status.DEADLINE_EXCEEDED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof InterruptedException) {
return Status.CANCELLED.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
} else if (e instanceof IOException) {
return Status.INTERNAL.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}

// ========== 5. Unknown/Unmapped Exceptions ==========
// Safety fallback for any unexpected exception to {@code Status.INTERNAL} with full debugging info
else {
logger.warn("Unmapped exception type: {}, treating as INTERNAL error", e.getClass().getSimpleName());
return Status.INTERNAL.withDescription(ExceptionsHelper.stackTrace(e)).asRuntimeException();
}
}

/**
* Handles OpenSearch-specific exceptions by converting their HTTP status to GRPC status.
* Uses {@link ExceptionsHelper#summaryMessage(Throwable)} for exact parity with HTTP error handling.
*
* Uses {@link ExceptionsHelper#unwrapToOpenSearchException(Throwable)} for shared unwrapping logic
* with HTTP's {@link OpenSearchException#generateFailureXContent}.
*
* @param e The {@link OpenSearchException} to convert
* @return StatusRuntimeException with mapped GRPC status and HTTP-identical error message
*/
private static StatusRuntimeException handleOpenSearchException(OpenSearchException e) {
Status grpcStatus = HttpToGrpcStatusConverter.convertHttpToGrpcStatus(e.status());

Throwable unwrapped = ExceptionsHelper.unwrapToOpenSearchException(e);

String description = ExceptionsHelper.summaryMessage(unwrapped);
return grpcStatus.withDescription(description).asRuntimeException();
}

}
Loading
Loading