diff --git a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationErrorCategory.java b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationErrorCategory.java new file mode 100644 index 0000000000..c5b765e239 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationErrorCategory.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.failure; + +/** + * Used to categorize application failures, for example, to distinguish benign errors from others. + * + * @see io.temporal.api.enums.v1.ApplicationErrorCategory + */ +public enum ApplicationErrorCategory { + UNSPECIFIED, + /** Expected application error with little/no severity. */ + BENIGN, + ; +} diff --git a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java index ffd63898cc..94fe3bae39 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/ApplicationFailure.java @@ -51,6 +51,7 @@ *
  • nonRetryable is set to false *
  • details are set to null *
  • stack trace is copied from the original exception + *
  • category is set to ApplicationErrorCategory.APPLICATION_ERROR_CATEGORY_UNSPECIFIED * */ public final class ApplicationFailure extends TemporalFailure { @@ -58,6 +59,7 @@ public final class ApplicationFailure extends TemporalFailure { private final Values details; private boolean nonRetryable; private Duration nextRetryDelay; + private final ApplicationErrorCategory category; /** * New ApplicationFailure with {@link #isNonRetryable()} flag set to false. @@ -92,7 +94,14 @@ public static ApplicationFailure newFailure(String message, String type, Object. */ public static ApplicationFailure newFailureWithCause( String message, String type, @Nullable Throwable cause, Object... details) { - return new ApplicationFailure(message, type, false, new EncodedValues(details), cause, null); + return new ApplicationFailure( + message, + type, + false, + new EncodedValues(details), + cause, + null, + ApplicationErrorCategory.UNSPECIFIED); } /** @@ -118,7 +127,13 @@ public static ApplicationFailure newFailureWithCauseAndDelay( Duration nextRetryDelay, Object... details) { return new ApplicationFailure( - message, type, false, new EncodedValues(details), cause, nextRetryDelay); + message, + type, + false, + new EncodedValues(details), + cause, + nextRetryDelay, + ApplicationErrorCategory.UNSPECIFIED); } /** @@ -153,7 +168,40 @@ public static ApplicationFailure newNonRetryableFailure( */ public static ApplicationFailure newNonRetryableFailureWithCause( String message, String type, @Nullable Throwable cause, Object... details) { - return new ApplicationFailure(message, type, true, new EncodedValues(details), cause, null); + return new ApplicationFailure( + message, + type, + true, + new EncodedValues(details), + cause, + null, + ApplicationErrorCategory.UNSPECIFIED); + } + + /** + * New ApplicationFailure with a specified category and {@link #isNonRetryable()} flag set to + * false. + * + *

    Note that this exception still may not be retried by the service if its type is included in + * the doNotRetry property of the correspondent retry policy. + * + * @param message optional error message + * @param type error type + * @param category the category of the application failure. + * @param cause failure cause. Each element of the cause chain will be converted to + * ApplicationFailure for network transmission across network if it doesn't extend {@link + * TemporalFailure} + * @param details optional details about the failure. They are serialized using the same approach + * as arguments and results. + */ + public static ApplicationFailure newFailureWithCategory( + String message, + String type, + ApplicationErrorCategory category, + @Nullable Throwable cause, + Object... details) { + return new ApplicationFailure( + message, type, false, new EncodedValues(details), cause, null, category); } static ApplicationFailure newFromValues( @@ -162,8 +210,10 @@ static ApplicationFailure newFromValues( boolean nonRetryable, Values details, Throwable cause, - Duration nextRetryDelay) { - return new ApplicationFailure(message, type, nonRetryable, details, cause, nextRetryDelay); + Duration nextRetryDelay, + ApplicationErrorCategory category) { + return new ApplicationFailure( + message, type, nonRetryable, details, cause, nextRetryDelay, category); } ApplicationFailure( @@ -172,12 +222,14 @@ static ApplicationFailure newFromValues( boolean nonRetryable, Values details, Throwable cause, - Duration nextRetryDelay) { + Duration nextRetryDelay, + ApplicationErrorCategory category) { super(getMessage(message, Objects.requireNonNull(type), nonRetryable), message, cause); this.type = type; this.details = details; this.nonRetryable = nonRetryable; this.nextRetryDelay = nextRetryDelay; + this.category = category; } public String getType() { @@ -210,6 +262,10 @@ public void setNextRetryDelay(Duration nextRetryDelay) { this.nextRetryDelay = nextRetryDelay; } + public ApplicationErrorCategory getApplicationErrorCategory() { + return category; + } + private static String getMessage(String message, String type, boolean nonRetryable) { return (Strings.isNullOrEmpty(message) ? "" : "message='" + message + "', ") + "type='" diff --git a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java index 3a255f31c6..0d181528bd 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java @@ -34,6 +34,7 @@ import io.temporal.common.converter.EncodedValues; import io.temporal.common.converter.FailureConverter; import io.temporal.internal.activity.ActivityTaskHandlerImpl; +import io.temporal.internal.common.FailureUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.sync.POJOWorkflowImplementationFactory; import io.temporal.serviceclient.CheckedExceptionWrapper; @@ -106,7 +107,8 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d cause, info.hasNextRetryDelay() ? ProtobufTimeUtils.toJavaDuration(info.getNextRetryDelay()) - : null); + : null, + FailureUtils.categoryFromProto(info.getCategory())); } case TIMEOUT_FAILURE_INFO: { @@ -146,13 +148,14 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d info.hasLastHeartbeatDetails() ? Optional.of(info.getLastHeartbeatDetails()) : Optional.empty(); - return new ApplicationFailure( + return ApplicationFailure.newFromValues( failure.getMessage(), "ResetWorkflow", false, new EncodedValues(details, dataConverter), cause, - null); + null, + ApplicationErrorCategory.UNSPECIFIED); } case ACTIVITY_FAILURE_INFO: { @@ -214,7 +217,8 @@ private RuntimeException failureToExceptionImpl(Failure failure, DataConverter d false, new EncodedValues(Optional.empty(), dataConverter), cause, - null); + null, + ApplicationErrorCategory.UNSPECIFIED); } } @@ -260,7 +264,8 @@ private Failure exceptionToFailure(Throwable throwable) { ApplicationFailureInfo.Builder info = ApplicationFailureInfo.newBuilder() .setType(ae.getType()) - .setNonRetryable(ae.isNonRetryable()); + .setNonRetryable(ae.isNonRetryable()) + .setCategory(FailureUtils.categoryToProto(ae.getApplicationErrorCategory())); Optional details = ((EncodedValues) ae.getDetails()).toPayloads(); if (details.isPresent()) { info.setDetails(details.get()); @@ -352,7 +357,10 @@ private Failure exceptionToFailure(Throwable throwable) { ApplicationFailureInfo.Builder info = ApplicationFailureInfo.newBuilder() .setType(throwable.getClass().getName()) - .setNonRetryable(false); + .setNonRetryable(false) + .setCategory( + io.temporal.api.enums.v1.ApplicationErrorCategory + .APPLICATION_ERROR_CATEGORY_UNSPECIFIED); failure.setApplicationFailureInfo(info); } return failure.build(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java index 1921ae94ed..8d59b7d64f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskExecutors.java @@ -36,6 +36,7 @@ import io.temporal.common.interceptors.ActivityInboundCallsInterceptor.ActivityOutput; import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkerInterceptor; +import io.temporal.internal.common.FailureUtils; import io.temporal.internal.worker.ActivityTaskHandler; import io.temporal.payload.context.ActivitySerializationContext; import io.temporal.serviceclient.CheckedExceptionWrapper; @@ -122,6 +123,14 @@ public ActivityTaskHandler.Result execute(ActivityInfoInternal info, Scope metri info.getActivityId(), info.getActivityType(), info.getAttempt()); + } else if (FailureUtils.isBenignApplicationFailure(ex)) { + log.debug( + "{} failure. ActivityId={}, activityType={}, attempt={}", + local ? "Local activity" : "Activity", + info.getActivityId(), + info.getActivityType(), + info.getAttempt(), + ex); } else { log.warn( "{} failure. ActivityId={}, activityType={}, attempt={}", diff --git a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java index 2575c4af55..13f6d585a7 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/activity/ActivityTaskHandlerImpl.java @@ -36,6 +36,7 @@ import io.temporal.common.metadata.POJOActivityImplMetadata; import io.temporal.common.metadata.POJOActivityMethodMetadata; import io.temporal.internal.activity.ActivityTaskExecutors.ActivityTaskExecutor; +import io.temporal.internal.common.FailureUtils; import io.temporal.internal.common.env.ReflectionUtils; import io.temporal.internal.worker.ActivityTask; import io.temporal.internal.worker.ActivityTaskHandler; @@ -209,11 +210,13 @@ static ActivityTaskHandler.Result mapToActivityFailure( Scope ms = metricsScope.tagged( ImmutableMap.of(MetricsTag.EXCEPTION, exception.getClass().getSimpleName())); - if (isLocalActivity) { - ms.counter(MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER).inc(1); - ms.counter(MetricsType.LOCAL_ACTIVITY_FAILED_COUNTER).inc(1); - } else { - ms.counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1); + if (!FailureUtils.isBenignApplicationFailure(exception)) { + if (isLocalActivity) { + ms.counter(MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER).inc(1); + ms.counter(MetricsType.LOCAL_ACTIVITY_FAILED_COUNTER).inc(1); + } else { + ms.counter(MetricsType.ACTIVITY_EXEC_FAILED_COUNTER).inc(1); + } } Failure failure = dataConverter.exceptionToFailure(exception); RespondActivityTaskFailedRequest.Builder result = diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/FailureUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/FailureUtils.java new file mode 100644 index 0000000000..272fd01771 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/FailureUtils.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.common; + +import io.temporal.api.failure.v1.Failure; +import io.temporal.failure.ApplicationErrorCategory; +import io.temporal.failure.ApplicationFailure; +import javax.annotation.Nullable; + +public class FailureUtils { + private FailureUtils() {} + + public static boolean isBenignApplicationFailure(@Nullable Throwable t) { + if (t instanceof ApplicationFailure + && ((ApplicationFailure) t).getApplicationErrorCategory() + == ApplicationErrorCategory.BENIGN) { + return true; + } + return false; + } + + public static boolean isBenignApplicationFailure(@Nullable Failure failure) { + if (failure != null + && failure.getApplicationFailureInfo() != null + && FailureUtils.categoryFromProto(failure.getApplicationFailureInfo().getCategory()) + == ApplicationErrorCategory.BENIGN) { + return true; + } + return false; + } + + public static ApplicationErrorCategory categoryFromProto( + io.temporal.api.enums.v1.ApplicationErrorCategory protoCategory) { + if (protoCategory == null) { + return ApplicationErrorCategory.UNSPECIFIED; + } + switch (protoCategory) { + case APPLICATION_ERROR_CATEGORY_BENIGN: + return ApplicationErrorCategory.BENIGN; + case APPLICATION_ERROR_CATEGORY_UNSPECIFIED: + case UNRECOGNIZED: + default: + // Fallback unrecognized or unspecified proto values as UNSPECIFIED + return ApplicationErrorCategory.UNSPECIFIED; + } + } + + public static io.temporal.api.enums.v1.ApplicationErrorCategory categoryToProto( + io.temporal.failure.ApplicationErrorCategory category) { + switch (category) { + case BENIGN: + return io.temporal.api.enums.v1.ApplicationErrorCategory.APPLICATION_ERROR_CATEGORY_BENIGN; + case UNSPECIFIED: + default: + // Fallback to UNSPECIFIED for unknown values + return io.temporal.api.enums.v1.ApplicationErrorCategory + .APPLICATION_ERROR_CATEGORY_UNSPECIFIED; + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java index ccae143c7a..c671b770bc 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowExecutor.java @@ -34,6 +34,7 @@ import io.temporal.api.update.v1.Input; import io.temporal.api.update.v1.Request; import io.temporal.failure.CanceledFailure; +import io.temporal.internal.common.FailureUtils; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.internal.common.UpdateMessage; import io.temporal.internal.statemachines.WorkflowStateMachines; @@ -153,7 +154,9 @@ private void completeWorkflow(@Nullable WorkflowExecutionException failure) { metricsScope.counter(MetricsType.WORKFLOW_CANCELED_COUNTER).inc(1); } else if (failure != null) { workflowStateMachines.failWorkflow(failure.getFailure()); - metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1); + if (!FailureUtils.isBenignApplicationFailure(failure.getFailure())) { + metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1); + } } else { ContinueAsNewWorkflowExecutionCommandAttributes attributes = context.getContinueAsNewOnCompletion(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java index 3214673445..a831447027 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java @@ -42,6 +42,7 @@ import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder; import io.temporal.internal.Config; +import io.temporal.internal.common.FailureUtils; import io.temporal.internal.common.SdkFlag; import io.temporal.internal.common.UpdateMessage; import io.temporal.internal.statemachines.ExecuteLocalActivityParameters; @@ -266,12 +267,15 @@ private void applyServerHistory(long lastEventId, WorkflowHistoryIterator histor implementationOptions.getFailWorkflowExceptionTypes(); for (Class failType : failTypes) { if (failType.isAssignableFrom(e.getClass())) { - metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1); + if (!FailureUtils.isBenignApplicationFailure(e)) { + metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1); + } throw new WorkflowExecutionException( workflow.getWorkflowContext().mapWorkflowExceptionToFailure(e)); } } - if (e instanceof WorkflowExecutionException) { + if (e instanceof WorkflowExecutionException + && !FailureUtils.isBenignApplicationFailure(e)) { metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1); } throw wrap(e); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/ActivityFailedMetricsTests.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/ActivityFailedMetricsTests.java new file mode 100644 index 0000000000..3386624cb0 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/ActivityFailedMetricsTests.java @@ -0,0 +1,318 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.worker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.activity.LocalActivityOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.common.reporter.TestStatsReporter; +import io.temporal.failure.ApplicationErrorCategory; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.MetricsType; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +public class ActivityFailedMetricsTests { + private final TestStatsReporter reporter = new TestStatsReporter(); + + private static final ListAppender listAppender = new ListAppender<>(); + + static { + LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory(); + ch.qos.logback.classic.Logger logger = context.getLogger("io.temporal.internal.activity"); + listAppender.setContext(context); + listAppender.start(); + logger.addAppender(listAppender); + logger.setLevel(Level.DEBUG); // Ensure we capture both debug and warn levels + } + + Scope metricsScope = + new RootScopeBuilder().reporter(reporter).reportEvery(com.uber.m3.util.Duration.ofMillis(1)); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setMetricsScope(metricsScope) + .setWorkflowTypes(ActivityWorkflowImpl.class, LocalActivityWorkflowImpl.class) + .setActivityImplementations(new TestActivityImpl()) + .build(); + + @Before + public void setup() { + reporter.flush(); + listAppender.list.clear(); + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + void execute(boolean isBenign); + } + + @WorkflowInterface + public interface ActivityWorkflow { + @WorkflowMethod + void execute(boolean isBenign); + } + + @WorkflowInterface + public interface LocalActivityWorkflow { + @WorkflowMethod + void execute(boolean isBenign); + } + + public static class TestActivityImpl implements TestActivity { + @Override + public void execute(boolean isBenign) { + if (!isBenign) { + throw ApplicationFailure.newFailure("Non-benign activity failure", "NonBenignType"); + } else { + throw ApplicationFailure.newFailureWithCategory( + "Benign activity failure", "BenignType", ApplicationErrorCategory.BENIGN, null); + } + } + } + + public static class ActivityWorkflowImpl implements ActivityWorkflow { + @Override + public void execute(boolean isBenign) { + TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(3)) + .setRetryOptions( + io.temporal.common.RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build()); + activity.execute(isBenign); + } + } + + public static class LocalActivityWorkflowImpl implements LocalActivityWorkflow { + @Override + public void execute(boolean isBenign) { + TestActivity activity = + Workflow.newLocalActivityStub( + TestActivity.class, + LocalActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(3)) + .setRetryOptions( + io.temporal.common.RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build()); + activity.execute(isBenign); + } + } + + private Map getActivityTagsWithWorkerType( + String workerType, String workflowType) { + Map tags = new HashMap<>(); + tags.put("task_queue", testWorkflowRule.getTaskQueue()); + tags.put("namespace", "UnitTest"); + tags.put("activity_type", "Execute"); + tags.put("exception", "ApplicationFailure"); + tags.put("worker_type", workerType); + tags.put("workflow_type", workflowType); + return tags; + } + + private int countLogMessages(String message, Level level) { + int count = 0; + List list = new ArrayList<>(listAppender.list); + for (ILoggingEvent event : list) { + if (event.getFormattedMessage().contains(message) && event.getLevel() == level) { + count++; + } + } + return count; + } + + @Test + public void activityFailureMetricBenignApplicationError() { + reporter.assertNoMetric( + MetricsType.ACTIVITY_EXEC_FAILED_COUNTER, + getActivityTagsWithWorkerType("ActivityWorker", "ActivityWorkflow")); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + + WorkflowFailedException nonBenignErr = + assertThrows( + WorkflowFailedException.class, + () -> + client + .newWorkflowStub( + ActivityWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(false)); + + assertTrue( + "Cause should be ActivityFailure", + nonBenignErr.getCause() instanceof io.temporal.failure.ActivityFailure); + assertTrue( + "Inner cause should be ApplicationFailure", + nonBenignErr.getCause().getCause() instanceof ApplicationFailure); + ApplicationFailure af = (ApplicationFailure) nonBenignErr.getCause().getCause(); + assertFalse( + "Failure should not be benign", + af.getApplicationErrorCategory() == ApplicationErrorCategory.BENIGN); + assertEquals("Non-benign activity failure", af.getOriginalMessage()); + + reporter.assertCounter( + MetricsType.ACTIVITY_EXEC_FAILED_COUNTER, + getActivityTagsWithWorkerType("ActivityWorker", "ActivityWorkflow"), + 1); + + // Execute workflow with benign activity failure + WorkflowFailedException benignErr = + assertThrows( + WorkflowFailedException.class, + () -> + client + .newWorkflowStub( + ActivityWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(true)); + + assertTrue( + "Cause should be ActivityFailure", + benignErr.getCause() instanceof io.temporal.failure.ActivityFailure); + assertTrue( + "Inner cause should be ApplicationFailure", + benignErr.getCause().getCause() instanceof ApplicationFailure); + ApplicationFailure af2 = (ApplicationFailure) benignErr.getCause().getCause(); + assertTrue( + "Failure should be benign", + af2.getApplicationErrorCategory() == ApplicationErrorCategory.BENIGN); + assertEquals("Benign activity failure", af2.getOriginalMessage()); + + // Expect metrics to remain unchanged for benign failure + reporter.assertCounter( + MetricsType.ACTIVITY_EXEC_FAILED_COUNTER, + getActivityTagsWithWorkerType("ActivityWorker", "ActivityWorkflow"), + 1); + + // Verify log levels + assertEquals(countLogMessages("Activity failure.", Level.WARN), 1); + assertEquals(countLogMessages("Activity failure.", Level.DEBUG), 1); + } + + @Test + public void localActivityFailureMetricBenignApplicationError() { + reporter.assertNoMetric( + MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER, + getActivityTagsWithWorkerType("LocalActivityWorker", "LocalActivityWorkflow")); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + + WorkflowFailedException nonBenignErr = + assertThrows( + WorkflowFailedException.class, + () -> + client + .newWorkflowStub( + LocalActivityWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(false)); + + assertTrue( + "Cause should be ActivityFailure", + nonBenignErr.getCause() instanceof io.temporal.failure.ActivityFailure); + assertTrue( + "Inner cause should be ApplicationFailure", + nonBenignErr.getCause().getCause() instanceof ApplicationFailure); + ApplicationFailure af = (ApplicationFailure) nonBenignErr.getCause().getCause(); + assertFalse( + "Failure should not be benign", + af.getApplicationErrorCategory() == ApplicationErrorCategory.BENIGN); + assertEquals("Non-benign activity failure", af.getOriginalMessage()); + + // Expect metrics to be incremented for non-benign failure + reporter.assertCounter( + MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER, + getActivityTagsWithWorkerType("LocalActivityWorker", "LocalActivityWorkflow"), + 1); + + WorkflowFailedException benignErr = + assertThrows( + WorkflowFailedException.class, + () -> + client + .newWorkflowStub( + LocalActivityWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(true)); + + assertTrue( + "Cause should be ActivityFailure", + benignErr.getCause() instanceof io.temporal.failure.ActivityFailure); + assertTrue( + "Inner cause should be ApplicationFailure", + benignErr.getCause().getCause() instanceof ApplicationFailure); + ApplicationFailure af2 = (ApplicationFailure) benignErr.getCause().getCause(); + assertTrue( + "Failure should be benign", + af2.getApplicationErrorCategory() == ApplicationErrorCategory.BENIGN); + assertEquals("Benign activity failure", af2.getOriginalMessage()); + + // Expect metrics to remain unchanged for benign failure + reporter.assertCounter( + MetricsType.LOCAL_ACTIVITY_EXEC_FAILED_COUNTER, + getActivityTagsWithWorkerType("LocalActivityWorker", "LocalActivityWorkflow"), + 1); + + // Verify log levels + assertEquals(countLogMessages("Local activity failure.", Level.WARN), 1); + assertEquals(countLogMessages("Local activity failure.", Level.DEBUG), 1); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java index f74c8bc1d5..32dc79d4e0 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowFailedMetricsTests.java @@ -20,7 +20,10 @@ package io.temporal.internal.worker; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import com.uber.m3.tally.RootScopeBuilder; import com.uber.m3.tally.Scope; @@ -30,7 +33,9 @@ import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowOptions; import io.temporal.common.reporter.TestStatsReporter; +import io.temporal.failure.ApplicationErrorCategory; import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.TemporalFailure; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.worker.MetricsType; import io.temporal.worker.NonDeterministicException; @@ -61,7 +66,10 @@ public class WorkflowFailedMetricsTests { NonDeterministicException.class, IllegalArgumentException.class) .build()) .setMetricsScope(metricsScope) - .setWorkflowTypes(NonDeterministicWorkflowImpl.class, WorkflowExceptionImpl.class) + .setWorkflowTypes( + NonDeterministicWorkflowImpl.class, + WorkflowExceptionImpl.class, + ApplicationFailureWorkflowImpl.class) .build(); @Before @@ -84,6 +92,12 @@ public interface TestWorkflow { String workflow(boolean runtimeException); } + @WorkflowInterface + public interface ApplicationFailureWorkflow { + @WorkflowMethod + void execute(boolean isBenign); + } + public static class NonDeterministicWorkflowImpl implements TestWorkflowWithSignal { @Override public String workflow() { @@ -110,6 +124,18 @@ public String workflow(boolean runtimeException) { } } + public static class ApplicationFailureWorkflowImpl implements ApplicationFailureWorkflow { + @Override + public void execute(boolean isBenign) { + if (!isBenign) { + throw ApplicationFailure.newFailure("Non-benign failure", "NonBenignType"); + } else { + throw ApplicationFailure.newFailureWithCategory( + "Benign failure", "BenignType", ApplicationErrorCategory.BENIGN, null); + } + } + } + private Map getWorkflowTags(String workflowType) { return ImmutableMap.of( "task_queue", @@ -168,4 +194,62 @@ public void applicationFailureWorkflowFailedMetric() { assertThrows(WorkflowFailedException.class, () -> workflow.workflow(false)); reporter.assertCounter(MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("TestWorkflow"), 1); } + + @Test + public void workflowFailureMetricBenignApplicationError() { + reporter.assertNoMetric( + MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("ApplicationFailureWorkflow")); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + ApplicationFailureWorkflow nonBenignStub = + client.newWorkflowStub( + ApplicationFailureWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + + WorkflowFailedException e1 = + assertThrows(WorkflowFailedException.class, () -> nonBenignStub.execute(false)); + + Throwable cause1 = e1.getCause(); + assertTrue("Cause should be ApplicationFailure", cause1 instanceof ApplicationFailure); + boolean isBenign = + ((ApplicationFailure) cause1).getApplicationErrorCategory() + == ApplicationErrorCategory.BENIGN; + assertFalse("Failure should not be benign", isBenign); + assertEquals("Non-benign failure", ((TemporalFailure) cause1).getOriginalMessage()); + + reporter.assertCounter( + MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("ApplicationFailureWorkflow"), 1); + + ApplicationFailureWorkflow benignStub = + client.newWorkflowStub( + ApplicationFailureWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + + WorkflowFailedException e2 = + assertThrows( + WorkflowFailedException.class, + () -> + client + .newWorkflowStub( + ApplicationFailureWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()) + .execute(true)); + + Throwable cause2 = e2.getCause(); + assertTrue("Cause should be ApplicationFailure", cause2 instanceof ApplicationFailure); + isBenign = + ((ApplicationFailure) cause2).getApplicationErrorCategory() + == ApplicationErrorCategory.BENIGN; + assertTrue("Failure should be benign", isBenign); + assertEquals("Benign failure", ((TemporalFailure) cause2).getOriginalMessage()); + + reporter.assertCounter( + MetricsType.WORKFLOW_FAILED_COUNTER, getWorkflowTags("ApplicationFailureWorkflow"), 1); + } } diff --git a/temporal-serviceclient/src/main/proto b/temporal-serviceclient/src/main/proto index 7f48823155..d7bb01b95f 160000 --- a/temporal-serviceclient/src/main/proto +++ b/temporal-serviceclient/src/main/proto @@ -1 +1 @@ -Subproject commit 7f4882315504ef88b6e6e9bc371e75dc1fbda844 +Subproject commit d7bb01b95f5ddff01799023c426d43a66e384c7d