diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java index 77f0ad2daa4eb..19ea94ae89a40 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelper.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.ParseField; import org.elasticsearch.rest.RestStatus; @@ -112,7 +113,36 @@ public static T requireNonNull(T obj, ParseField paramName) { return requireNonNull(obj, paramName.getPreferredName()); } + /** + * @see org.elasticsearch.ExceptionsHelper#unwrapCause(Throwable) + */ public static Throwable unwrapCause(Throwable t) { - return org.elasticsearch.ExceptionsHelper.unwrapCause(t); + return org.elasticsearch.ExceptionsHelper.unwrapCause(t); + } + + /** + * Unwrap the exception stack and return the most likely cause. + * This method has special handling for {@link SearchPhaseExecutionException} + * where it returns the cause of the first shard failure. + * + * @param t raw Throwable + * @return unwrapped throwable if possible + */ + public static Throwable findSearchExceptionRootCause(Throwable t) { + // circuit breaking exceptions are at the bottom + Throwable unwrappedThrowable = unwrapCause(t); + + if (unwrappedThrowable instanceof SearchPhaseExecutionException) { + SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable; + for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) { + Throwable unwrappedShardFailure = unwrapCause(shardFailure.getCause()); + + if (unwrappedShardFailure instanceof ElasticsearchException) { + return unwrappedShardFailure; + } + } + } + + return unwrappedThrowable; } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/utils/ExceptionsHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/utils/ExceptionsHelper.java index 4c0f1e1286992..2a4a3843e576d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/utils/ExceptionsHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/utils/ExceptionsHelper.java @@ -19,4 +19,11 @@ public static T requireNonNull(T obj, String paramName) { } return obj; } + + /** + * @see org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper#findSearchExceptionRootCause(Throwable) + */ + public static Throwable findSearchExceptionRootCause(Throwable t) { + return org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper.findSearchExceptionRootCause(t); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelperTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelperTests.java new file mode 100644 index 0000000000000..fa82fcd954520 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/ExceptionsHelperTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.utils; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.indices.IndexCreationException; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class ExceptionsHelperTests extends ESTestCase { + + public void testFindSearchExceptionRootCause_GivenWrappedSearchPhaseException() { + SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase", + "partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) }); + + Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause( + new IndexCreationException("test-index", searchPhaseExecutionException)); + + assertThat(rootCauseException.getMessage(), equalTo("for the cause!")); + } + + public void testFindSearchExceptionRootCause_GivenRuntimeException() { + RuntimeException runtimeException = new RuntimeException("nothing to unwrap here"); + assertThat(ExceptionsHelper.findSearchExceptionRootCause(runtimeException), sameInstance(runtimeException)); + } + + public void testFindSearchExceptionRootCause_GivenWrapperException() { + RuntimeException runtimeException = new RuntimeException("cause"); + + Throwable rootCauseException = ExceptionsHelper.findSearchExceptionRootCause( + new IndexCreationException("test-index", runtimeException)); + + assertThat(rootCauseException.getMessage(), equalTo("cause")); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index bb9e91a098280..746b1a04f87cb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -7,7 +7,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ElasticsearchWrapperException; import org.elasticsearch.client.Client; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; @@ -318,7 +321,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro try { extractedData = dataExtractor.next(); } catch (Exception e) { - LOGGER.debug("[" + jobId + "] error while extracting data", e); + LOGGER.error(new ParameterizedMessage("[{}] error while extracting data", jobId), e); // When extraction problems are encountered, we do not want to advance time. // Instead, it is preferable to retry the given interval next time an extraction // is triggered. @@ -350,7 +353,7 @@ private void run(long start, long end, FlushJobAction.Request flushRequest) thro if (isIsolated) { return; } - LOGGER.debug("[" + jobId + "] error while posting data", e); + LOGGER.error(new ParameterizedMessage("[{}] error while posting data", jobId), e); // a conflict exception means the job state is not open any more. // we should therefore stop the datafeed. @@ -469,7 +472,7 @@ Long lastEndTimeMs() { return lastEndTimeMs; } - static class AnalysisProblemException extends RuntimeException { + static class AnalysisProblemException extends ElasticsearchException implements ElasticsearchWrapperException { final boolean shouldStop; final long nextDelayInMsSinceEpoch; @@ -481,7 +484,7 @@ static class AnalysisProblemException extends RuntimeException { } } - static class ExtractionProblemException extends RuntimeException { + static class ExtractionProblemException extends ElasticsearchException implements ElasticsearchWrapperException { final long nextDelayInMsSinceEpoch; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index fb719307a26e8..69d582d4d6db8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -184,12 +184,12 @@ protected void doRun() { if (endTime == null) { next = e.nextDelayInMsSinceEpoch; } - holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); + holder.problemTracker.reportExtractionProblem(e); } catch (DatafeedJob.AnalysisProblemException e) { if (endTime == null) { next = e.nextDelayInMsSinceEpoch; } - holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); + holder.problemTracker.reportAnalysisProblem(e); if (e.shouldStop) { holder.stop("lookback_analysis_error", TimeValue.timeValueSeconds(20), e); return; @@ -241,10 +241,10 @@ protected void doRun() { holder.problemTracker.reportNonEmptyDataCount(); } catch (DatafeedJob.ExtractionProblemException e) { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; - holder.problemTracker.reportExtractionProblem(e.getCause().getMessage()); + holder.problemTracker.reportExtractionProblem(e); } catch (DatafeedJob.AnalysisProblemException e) { nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch; - holder.problemTracker.reportAnalysisProblem(e.getCause().getMessage()); + holder.problemTracker.reportAnalysisProblem(e); if (e.shouldStop) { holder.stop("realtime_analysis_error", TimeValue.timeValueSeconds(20), e); return; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java index a8260c2eade50..4734310394eb3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/ProblemTracker.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.datafeed; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import java.util.Objects; @@ -42,19 +43,19 @@ class ProblemTracker { /** * Reports as analysis problem if it is different than the last seen problem * - * @param problemMessage the problem message + * @param error the exception */ - public void reportAnalysisProblem(String problemMessage) { - reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, problemMessage); + public void reportAnalysisProblem(DatafeedJob.AnalysisProblemException error) { + reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR, ExceptionsHelper.unwrapCause(error).getMessage()); } /** * Reports as extraction problem if it is different than the last seen problem * - * @param problemMessage the problem message + * @param error the exception */ - public void reportExtractionProblem(String problemMessage) { - reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, problemMessage); + public void reportExtractionProblem(DatafeedJob.ExtractionProblemException error) { + reportProblem(Messages.JOB_AUDIT_DATAFEED_DATA_EXTRACTION_ERROR, ExceptionsHelper.findSearchExceptionRootCause(error).getMessage()); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java index d4deb6c665040..c138c5a8641e5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/ProblemTrackerTests.java @@ -5,6 +5,10 @@ */ package org.elasticsearch.xpack.ml.datafeed; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchWrapperException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.junit.Before; @@ -27,33 +31,43 @@ public void setUpTests() { } public void testReportExtractionProblem() { - problemTracker.reportExtractionProblem("foo"); + problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause")); - verify(auditor).error("foo", "Datafeed is encountering errors extracting data: foo"); + verify(auditor).error("foo", "Datafeed is encountering errors extracting data: cause"); + assertTrue(problemTracker.hasProblems()); + } + + public void testReportExtractionProblem_GivenSearchPhaseExecutionException() { + SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase", + "partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) }); + + problemTracker.reportExtractionProblem(new DatafeedJob.ExtractionProblemException(0L, searchPhaseExecutionException)); + + verify(auditor).error("foo", "Datafeed is encountering errors extracting data: for the cause!"); assertTrue(problemTracker.hasProblems()); } public void testReportAnalysisProblem() { - problemTracker.reportAnalysisProblem("foo"); + problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause")); - verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: foo"); + verify(auditor).error("foo", "Datafeed is encountering errors submitting data for analysis: cause"); assertTrue(problemTracker.hasProblems()); } public void testReportProblem_GivenSameProblemTwice() { - problemTracker.reportExtractionProblem("foo"); - problemTracker.reportAnalysisProblem("foo"); + problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause")); + problemTracker.reportAnalysisProblem(createAnalysisProblem("top level", "cause")); - verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo"); + verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause"); assertTrue(problemTracker.hasProblems()); } public void testReportProblem_GivenSameProblemAfterFinishReport() { - problemTracker.reportExtractionProblem("foo"); + problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause")); problemTracker.finishReport(); - problemTracker.reportExtractionProblem("foo"); + problemTracker.reportExtractionProblem(createExtractionProblem("top level", "cause")); - verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: foo"); + verify(auditor, times(1)).error("foo", "Datafeed is encountering errors extracting data: cause"); assertTrue(problemTracker.hasProblems()); } @@ -108,7 +122,7 @@ public void testFinishReport_GivenNoProblems() { } public void testFinishReport_GivenRecovery() { - problemTracker.reportExtractionProblem("bar"); + problemTracker.reportExtractionProblem(createExtractionProblem("top level", "bar")); problemTracker.finishReport(); problemTracker.finishReport(); @@ -116,4 +130,23 @@ public void testFinishReport_GivenRecovery() { verify(auditor).info("foo", "Datafeed has recovered data extraction and analysis"); assertFalse(problemTracker.hasProblems()); } + + private static DatafeedJob.ExtractionProblemException createExtractionProblem(String error, String cause) { + Exception causeException = new RuntimeException(cause); + Exception wrappedException = new TestWrappedException(error, causeException); + return new DatafeedJob.ExtractionProblemException(0L, wrappedException); + } + + private static DatafeedJob.AnalysisProblemException createAnalysisProblem(String error, String cause) { + Exception causeException = new RuntimeException(cause); + Exception wrappedException = new TestWrappedException(error, causeException); + return new DatafeedJob.AnalysisProblemException(0L, false, wrappedException); + } + + private static class TestWrappedException extends RuntimeException implements ElasticsearchWrapperException { + + TestWrappedException(String message, Throwable cause) { + super(message, cause); + } + } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index d6f6ab4722c2d..9c4c1cc499e2a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -631,7 +631,7 @@ synchronized void stopAndSaveState() { synchronized void handleFailure(Exception e) { logger.warn(new ParameterizedMessage("[{}] transform encountered an exception: ", getJobId()), e); - Throwable unwrappedException = ExceptionRootCauseFinder.getRootCauseException(e); + Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e); if (unwrappedException instanceof CircuitBreakingException) { handleCircuitBreakingException((CircuitBreakingException) unwrappedException); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java index 87cfaeb54cefb..972f01d848242 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinder.java @@ -8,8 +8,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.rest.RestStatus; import java.util.Arrays; @@ -38,30 +36,6 @@ public final class ExceptionRootCauseFinder { ) ); - /** - * Unwrap the exception stack and return the most likely cause. - * - * @param t raw Throwable - * @return unwrapped throwable if possible - */ - public static Throwable getRootCauseException(Throwable t) { - // circuit breaking exceptions are at the bottom - Throwable unwrappedThrowable = org.elasticsearch.ExceptionsHelper.unwrapCause(t); - - if (unwrappedThrowable instanceof SearchPhaseExecutionException) { - SearchPhaseExecutionException searchPhaseException = (SearchPhaseExecutionException) unwrappedThrowable; - for (ShardSearchFailure shardFailure : searchPhaseException.shardFailures()) { - Throwable unwrappedShardFailure = org.elasticsearch.ExceptionsHelper.unwrapCause(shardFailure.getCause()); - - if (unwrappedShardFailure instanceof ElasticsearchException) { - return unwrappedShardFailure; - } - } - } - - return t; - } - /** * Return the best error message possible given a already unwrapped exception. * diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java index 3ede642404a64..8a04ee1016160 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/utils/ExceptionRootCauseFinderTests.java @@ -6,18 +6,14 @@ package org.elasticsearch.xpack.transform.utils; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.DocWriteRequest.OpType; import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.TranslogException; -import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -25,8 +21,6 @@ import java.util.HashMap; import java.util.Map; -import static org.hamcrest.Matchers.equalTo; - public class ExceptionRootCauseFinderTests extends ESTestCase { public void testFetFirstIrrecoverableExceptionFromBulkResponses() { @@ -149,16 +143,6 @@ public void testFetFirstIrrecoverableExceptionFromBulkResponses() { assertNull(ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses.values())); } - public void testGetRootCauseException_GivenWrappedSearchPhaseException() { - SearchPhaseExecutionException searchPhaseExecutionException = new SearchPhaseExecutionException("test-phase", - "partial shards failure", new ShardSearchFailure[] { new ShardSearchFailure(new ElasticsearchException("for the cause!")) }); - - Throwable rootCauseException = ExceptionRootCauseFinder.getRootCauseException( - new IndexCreationException("test-index", searchPhaseExecutionException)); - - assertThat(rootCauseException.getMessage(), equalTo("for the cause!")); - } - private static void assertFirstException(Collection bulkItemResponses, Class expectedClass, String message) { Throwable t = ExceptionRootCauseFinder.getFirstIrrecoverableExceptionFromBulkResponses(bulkItemResponses); assertNotNull(t);