diff --git a/build.gradle b/build.gradle index 9a7db86d21..360feec1a8 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { // Platforms grpcVersion = '1.58.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager jacksonVersion = '2.14.2' // [2.9.0,) - nexusVersion = '0.3.0-alpha' + nexusVersion = '0.4.0-alpha' // we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though. micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,) diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java index f5420c7b7d..f7533b0faf 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingNexusOperationInboundCallsInterceptor.java @@ -20,7 +20,7 @@ package io.temporal.opentracing.internal; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -49,8 +49,7 @@ public OpenTracingNexusOperationInboundCallsInterceptor( } @Override - public StartOperationOutput startOperation(StartOperationInput input) - throws OperationUnsuccessfulException { + public StartOperationOutput startOperation(StartOperationInput input) throws OperationException { SpanContext rootSpanContext = contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer); diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java index a01d63dcec..4e1f78c1b2 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/CodecDataConverter.java @@ -29,7 +29,6 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.failure.v1.ResetWorkflowFailureInfo; import io.temporal.api.failure.v1.TimeoutFailureInfo; -import io.temporal.failure.TemporalFailure; import io.temporal.payload.codec.ChainCodec; import io.temporal.payload.codec.PayloadCodec; import io.temporal.payload.context.SerializationContext; @@ -199,7 +198,7 @@ public Failure exceptionToFailure(@Nonnull Throwable throwable) { @Override @Nonnull - public TemporalFailure failureToException(@Nonnull Failure failure) { + public RuntimeException failureToException(@Nonnull Failure failure) { Preconditions.checkNotNull(failure, "failure"); return ConverterUtils.withContext(dataConverter, serializationContext) .failureToException(this.decodeFailure(failure.toBuilder()).build()); diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/DataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/DataConverter.java index 9aee115213..c033e90047 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/DataConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/DataConverter.java @@ -28,7 +28,6 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.common.Experimental; import io.temporal.failure.DefaultFailureConverter; -import io.temporal.failure.TemporalFailure; import io.temporal.payload.codec.PayloadCodec; import io.temporal.payload.context.SerializationContext; import java.lang.reflect.Type; @@ -176,7 +175,7 @@ default Object[] fromPayloads( * @throws NullPointerException if failure is null */ @Nonnull - default TemporalFailure failureToException(@Nonnull Failure failure) { + default RuntimeException failureToException(@Nonnull Failure failure) { Preconditions.checkNotNull(failure, "failure"); return new DefaultFailureConverter().failureToException(failure, this); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/FailureConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/FailureConverter.java index 3aaccb4583..48a9a7a692 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/FailureConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/FailureConverter.java @@ -22,7 +22,6 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.failure.DefaultFailureConverter; -import io.temporal.failure.TemporalFailure; import io.temporal.payload.context.SerializationContext; import javax.annotation.Nonnull; @@ -49,7 +48,7 @@ public interface FailureConverter { * @throws NullPointerException if either failure or dataConverter is null */ @Nonnull - TemporalFailure failureToException( + RuntimeException failureToException( @Nonnull Failure failure, @Nonnull DataConverter dataConverter); /** diff --git a/temporal-sdk/src/main/java/io/temporal/common/converter/PayloadAndFailureDataConverter.java b/temporal-sdk/src/main/java/io/temporal/common/converter/PayloadAndFailureDataConverter.java index 2ab0d21e82..53899d7041 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/converter/PayloadAndFailureDataConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/common/converter/PayloadAndFailureDataConverter.java @@ -28,7 +28,6 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.api.failure.v1.Failure; import io.temporal.failure.DefaultFailureConverter; -import io.temporal.failure.TemporalFailure; import io.temporal.payload.context.SerializationContext; import java.lang.reflect.Type; import java.util.*; @@ -135,7 +134,7 @@ public T fromPayloads( @Override @Nonnull - public TemporalFailure failureToException(@Nonnull Failure failure) { + public RuntimeException failureToException(@Nonnull Failure failure) { Preconditions.checkNotNull(failure, "failure"); return (serializationContext != null ? failureConverter.withContext(serializationContext) diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java index fda964191e..44a9da5849 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptor.java @@ -20,7 +20,7 @@ package io.temporal.common.interceptors; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; import io.nexusrpc.handler.*; import io.temporal.common.Experimental; @@ -103,10 +103,9 @@ final class CancelOperationOutput {} * * @param input input to the operation start. * @return result of the operation start. - * @throws OperationUnsuccessfulException if the operation start failed. + * @throws io.nexusrpc.OperationException if the operation start failed. */ - StartOperationOutput startOperation(StartOperationInput input) - throws OperationUnsuccessfulException; + StartOperationOutput startOperation(StartOperationInput input) throws OperationException; /** * Intercepts a call to cancel a Nexus operation. diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java index b523a9c6c6..40dbb1f1c2 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/NexusOperationInboundCallsInterceptorBase.java @@ -20,7 +20,7 @@ package io.temporal.common.interceptors; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; import io.temporal.common.Experimental; /** Convenience base class for {@link NexusOperationInboundCallsInterceptor} implementations. */ @@ -39,8 +39,7 @@ public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { } @Override - public StartOperationOutput startOperation(StartOperationInput input) - throws OperationUnsuccessfulException { + public StartOperationOutput startOperation(StartOperationInput input) throws OperationException { return next.startOperation(input); } diff --git a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java index 0af7e9739e..3a255f31c6 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/DefaultFailureConverter.java @@ -23,9 +23,11 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; +import io.nexusrpc.handler.HandlerException; import io.temporal.api.common.v1.ActivityType; import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowType; +import io.temporal.api.enums.v1.NexusHandlerErrorRetryBehavior; import io.temporal.api.failure.v1.*; import io.temporal.client.ActivityCanceledException; import io.temporal.common.converter.DataConverter; @@ -72,12 +74,14 @@ public final class DefaultFailureConverter implements FailureConverter { @Override @Nonnull - public TemporalFailure failureToException( + public RuntimeException failureToException( @Nonnull Failure failure, @Nonnull DataConverter dataConverter) { Preconditions.checkNotNull(failure, "failure"); Preconditions.checkNotNull(dataConverter, "dataConverter"); - TemporalFailure result = failureToExceptionImpl(failure, dataConverter); - result.setFailure(failure); + RuntimeException result = failureToExceptionImpl(failure, dataConverter); + if (result instanceof TemporalFailure) { + ((TemporalFailure) result).setFailure(failure); + } if (failure.getSource().equals(JAVA_SDK) && !failure.getStackTrace().isEmpty()) { StackTraceElement[] stackTrace = parseStackTrace(failure.getStackTrace()); result.setStackTrace(stackTrace); @@ -85,8 +89,8 @@ public TemporalFailure failureToException( return result; } - private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter dataConverter) { - TemporalFailure cause = + private RuntimeException failureToExceptionImpl(Failure failure, DataConverter dataConverter) { + Exception cause = failure.hasCause() ? failureToException(failure.getCause(), dataConverter) : null; switch (failure.getFailureInfoCase()) { case APPLICATION_FAILURE_INFO: @@ -184,9 +188,23 @@ private TemporalFailure failureToExceptionImpl(Failure failure, DataConverter da info.getEndpoint(), info.getService(), info.getOperation(), - info.getOperationId(), + info.getOperationToken().isEmpty() ? info.getOperationId() : info.getOperationToken(), cause); } + case NEXUS_HANDLER_FAILURE_INFO: + { + NexusHandlerFailureInfo info = failure.getNexusHandlerFailureInfo(); + HandlerException.RetryBehavior retryBehavior = HandlerException.RetryBehavior.UNSPECIFIED; + switch (info.getRetryBehavior()) { + case NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: + retryBehavior = HandlerException.RetryBehavior.RETRYABLE; + break; + case NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: + retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE; + break; + } + return new HandlerException(info.getType(), cause, retryBehavior); + } case FAILUREINFO_NOT_SET: default: // All unknown types are considered to be retryable ApplicationError. @@ -302,14 +320,34 @@ private Failure exceptionToFailure(Throwable throwable) { failure.setCanceledFailureInfo(info); } else if (throwable instanceof NexusOperationFailure) { NexusOperationFailure no = (NexusOperationFailure) throwable; - NexusOperationFailureInfo.Builder info = + NexusOperationFailureInfo.Builder op = NexusOperationFailureInfo.newBuilder() .setScheduledEventId(no.getScheduledEventId()) .setEndpoint(no.getEndpoint()) .setService(no.getService()) .setOperation(no.getOperation()) - .setOperationId(no.getOperationId()); - failure.setNexusOperationExecutionFailureInfo(info); + .setOperationId(no.getOperationToken()) + .setOperationToken(no.getOperationToken()); + failure.setNexusOperationExecutionFailureInfo(op); + } else if (throwable instanceof HandlerException) { + HandlerException he = (HandlerException) throwable; + NexusHandlerErrorRetryBehavior retryBehavior = + NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_UNSPECIFIED; + switch (he.getRetryBehavior()) { + case RETRYABLE: + retryBehavior = + NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE; + break; + case NON_RETRYABLE: + retryBehavior = + NexusHandlerErrorRetryBehavior.NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE; + break; + } + NexusHandlerFailureInfo.Builder info = + NexusHandlerFailureInfo.newBuilder() + .setType(he.getRawErrorType()) + .setRetryBehavior(retryBehavior); + failure.setNexusHandlerFailureInfo(info); } else { ApplicationFailureInfo.Builder info = ApplicationFailureInfo.newBuilder() diff --git a/temporal-sdk/src/main/java/io/temporal/failure/NexusOperationFailure.java b/temporal-sdk/src/main/java/io/temporal/failure/NexusOperationFailure.java index 634435bb41..a9e65f6431 100644 --- a/temporal-sdk/src/main/java/io/temporal/failure/NexusOperationFailure.java +++ b/temporal-sdk/src/main/java/io/temporal/failure/NexusOperationFailure.java @@ -35,7 +35,7 @@ public final class NexusOperationFailure extends TemporalFailure { private final String endpoint; private final String service; private final String operation; - private final String operationId; + private final String operationToken; public NexusOperationFailure( String message, @@ -43,17 +43,17 @@ public NexusOperationFailure( String endpoint, String service, String operation, - String operationId, + String operationToken, Throwable cause) { super( - getMessage(message, scheduledEventId, endpoint, service, operation, operationId), + getMessage(message, scheduledEventId, endpoint, service, operation, operationToken), message, cause); this.scheduledEventId = scheduledEventId; this.endpoint = endpoint; this.service = service; this.operation = operation; - this.operationId = operationId; + this.operationToken = operationToken; } public static String getMessage( @@ -62,7 +62,7 @@ public static String getMessage( String endpoint, String service, String operation, - String operationId) { + String operationToken) { return "Nexus Operation with operation='" + operation + "service='" @@ -74,7 +74,7 @@ public static String getMessage( + "'. " + "scheduledEventId=" + scheduledEventId - + (operationId == null ? "" : ", operationId=" + operationId); + + (operationToken == null ? "" : ", operationToken=" + operationToken); } public long getScheduledEventId() { @@ -93,7 +93,7 @@ public String getOperation() { return operation; } - public String getOperationId() { - return operationId; + public String getOperationToken() { + return operationToken; } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index a0e0798467..bf46d4eed8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -79,6 +79,7 @@ public static Object getValueOrDefault(Object value, Class valueClass) { * @return a new stub bound to the same workflow as the given stub, but with the Nexus callback * URL and headers set */ + @SuppressWarnings("deprecation") // Check the OPERATION_ID header for backwards compatibility public static WorkflowStub createNexusBoundStub( WorkflowStub stub, NexusStartWorkflowRequest request) { if (!stub.getOptions().isPresent()) { @@ -103,6 +104,9 @@ public static WorkflowStub createNexusBoundStub( if (!headers.containsKey(Header.OPERATION_ID)) { headers.put(Header.OPERATION_ID.toLowerCase(), options.getWorkflowId()); } + if (!headers.containsKey(Header.OPERATION_TOKEN)) { + headers.put(Header.OPERATION_TOKEN.toLowerCase(), options.getWorkflowId()); + } WorkflowOptions.Builder nexusWorkflowOptions = WorkflowOptions.newBuilder(options) .setRequestId(request.getRequestId()) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java b/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java index dc6ee53f54..446dea7fd2 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/NexusUtil.java @@ -20,12 +20,26 @@ package io.temporal.internal.common; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; import io.nexusrpc.Link; +import io.temporal.api.nexus.v1.Failure; +import io.temporal.common.converter.DataConverter; import java.net.URI; import java.net.URISyntaxException; import java.time.Duration; +import java.util.Collections; +import java.util.Map; public class NexusUtil { + private static final JsonFormat.Printer JSON_PRINTER = + JsonFormat.printer().omittingInsignificantWhitespace(); + private static final String TEMPORAL_FAILURE_TYPE_STRING = + io.temporal.api.failure.v1.Failure.getDescriptor().getFullName(); + private static final Map NEXUS_FAILURE_METADATA = + Collections.singletonMap("type", TEMPORAL_FAILURE_TYPE_STRING); + public static Duration parseRequestTimeout(String timeout) { try { if (timeout.endsWith("m")) { @@ -53,5 +67,23 @@ public static Link nexusProtoLinkToLink(io.temporal.api.nexus.v1.Link nexusLink) .build(); } + public static Failure exceptionToNexusFailure(Throwable exception, DataConverter dataConverter) { + io.temporal.api.failure.v1.Failure failure = dataConverter.exceptionToFailure(exception); + String details; + try { + details = JSON_PRINTER.print(failure.toBuilder().setMessage("").build()); + } catch (InvalidProtocolBufferException e) { + return Failure.newBuilder() + .setMessage("Failed to serialize failure details") + .setDetails(ByteString.copyFromUtf8(e.getMessage())) + .build(); + } + return Failure.newBuilder() + .setMessage(failure.getMessage()) + .setDetails(ByteString.copyFromUtf8(details)) + .putAllMetadata(NEXUS_FAILURE_METADATA) + .build(); + } + private NexusUtil() {} } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java index ead9ae2989..d79bcab561 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/NexusTaskHandlerImpl.java @@ -20,13 +20,12 @@ package io.temporal.internal.nexus; +import static io.temporal.internal.common.NexusUtil.exceptionToNexusFailure; import static io.temporal.internal.common.NexusUtil.nexusProtoLinkToLink; -import com.google.protobuf.ByteString; import com.uber.m3.tally.Scope; -import io.nexusrpc.FailureInfo; import io.nexusrpc.Header; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; import io.nexusrpc.handler.*; import io.temporal.api.common.v1.Payload; import io.temporal.api.nexus.v1.*; @@ -54,6 +53,7 @@ public class NexusTaskHandlerImpl implements NexusTaskHandler { private static final Logger log = LoggerFactory.getLogger(NexusTaskHandlerImpl.class); + private final DataConverter dataConverter; private final String namespace; private final String taskQueue; @@ -123,12 +123,9 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException timeout.toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS); } catch (IllegalArgumentException e) { - return new Result( - HandlerError.newBuilder() - .setErrorType(OperationHandlerException.ErrorType.BAD_REQUEST.toString()) - .setFailure( - Failure.newBuilder().setMessage("cannot parse request timeout").build()) - .build()); + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + new RuntimeException("Invalid request timeout header", e)); } } @@ -145,23 +142,21 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException handleCancelledOperation(ctx, request.getCancelOperation()); return new Result(Response.newBuilder().setCancelOperation(cancelResponse).build()); default: - return new Result( - HandlerError.newBuilder() - .setErrorType(OperationHandlerException.ErrorType.NOT_IMPLEMENTED.toString()) - .setFailure(Failure.newBuilder().setMessage("unknown request type").build()) - .build()); + throw new HandlerException( + HandlerException.ErrorType.NOT_IMPLEMENTED, + new RuntimeException("Unknown request type: " + request.getVariantCase())); } - } catch (OperationHandlerException e) { + } catch (HandlerException e) { return new Result( HandlerError.newBuilder() .setErrorType(e.getErrorType().toString()) - .setFailure(createFailure(e.getFailureInfo())) + .setFailure(exceptionToNexusFailure(e.getCause(), dataConverter)) .build()); } catch (Throwable e) { return new Result( HandlerError.newBuilder() - .setErrorType(OperationHandlerException.ErrorType.INTERNAL.toString()) - .setFailure(Failure.newBuilder().setMessage(e.toString()).build()) + .setErrorType(HandlerException.ErrorType.INTERNAL.toString()) + .setFailure(exceptionToNexusFailure(e, dataConverter)) .build()); } finally { // If the task timed out, we should not send a response back to the server @@ -176,20 +171,6 @@ public Result handle(NexusTask task, Scope metricsScope) throws TimeoutException } } - private Failure createFailure(FailureInfo failInfo) { - Failure.Builder failure = Failure.newBuilder(); - if (failInfo.getMessage() != null) { - failure.setMessage(failInfo.getMessage()); - } - if (failInfo.getDetailsJson() != null) { - failure.setDetails(ByteString.copyFromUtf8(failInfo.getDetailsJson())); - } - if (!failInfo.getMetadata().isEmpty()) { - failure.putAllMetadata(failInfo.getMetadata()); - } - return failure.build(); - } - private void cancelOperation(OperationContext context, OperationCancelDetails details) { try { serviceHandler.cancelOperation(context, details); @@ -210,7 +191,12 @@ private CancelOperationResponse handleCancelledOperation( ctx.setService(task.getService()).setOperation(task.getOperation()); OperationCancelDetails operationCancelDetails = - OperationCancelDetails.newBuilder().setOperationId(task.getOperationId()).build(); + OperationCancelDetails.newBuilder() + .setOperationToken( + task.getOperationToken().isEmpty() + ? task.getOperationId() + : task.getOperationToken()) + .build(); try { cancelOperation(ctx.build(), operationCancelDetails); } catch (Throwable failure) { @@ -222,17 +208,13 @@ private CancelOperationResponse handleCancelledOperation( private void convertKnownFailures(Throwable e) { Throwable failure = CheckedExceptionWrapper.unwrap(e); + if (failure instanceof WorkflowException) { + throw new HandlerException(HandlerException.ErrorType.BAD_REQUEST, failure); + } if (failure instanceof ApplicationFailure) { if (((ApplicationFailure) failure).isNonRetryable()) { - throw new OperationHandlerException( - OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage()); + throw new HandlerException(HandlerException.ErrorType.BAD_REQUEST, failure); } - throw new OperationHandlerException( - OperationHandlerException.ErrorType.INTERNAL, failure.getMessage()); - } - if (failure instanceof WorkflowException) { - throw new OperationHandlerException( - OperationHandlerException.ErrorType.BAD_REQUEST, failure.getMessage()); } if (failure instanceof Error) { throw (Error) failure; @@ -244,7 +226,7 @@ private void convertKnownFailures(Throwable e) { private OperationStartResult startOperation( OperationContext context, OperationStartDetails details, HandlerInputContent input) - throws OperationUnsuccessfulException { + throws OperationException { try { return serviceHandler.startOperation(context, details, input); } catch (Throwable e) { @@ -275,10 +257,9 @@ private StartOperationResponse handleStartOperation( operationStartDetails.addLink(nexusProtoLinkToLink(link)); } catch (URISyntaxException e) { log.error("failed to parse link url: " + link.getUrl(), e); - throw new OperationHandlerException( - OperationHandlerException.ErrorType.BAD_REQUEST, - "Invalid link URL: " + link.getUrl(), - e); + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + new RuntimeException("Invalid link URL: " + link.getUrl(), e)); } }); @@ -286,37 +267,43 @@ private StartOperationResponse handleStartOperation( HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput()); StartOperationResponse.Builder startResponseBuilder = StartOperationResponse.newBuilder(); + OperationContext context = ctx.build(); try { - OperationStartResult result = - startOperation(ctx.build(), operationStartDetails.build(), input.build()); - if (result.isSync()) { - startResponseBuilder.setSyncSuccess( - StartOperationResponse.Sync.newBuilder() - .setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes())) - .build()); - } else { - startResponseBuilder.setAsyncSuccess( - StartOperationResponse.Async.newBuilder() - .setOperationId(result.getAsyncOperationId()) - .addAllLinks( - result.getLinks().stream() - .map( - link -> - io.temporal.api.nexus.v1.Link.newBuilder() - .setType(link.getType()) - .setUrl(link.getUri().toString()) - .build()) - .collect(Collectors.toList())) - .build()); + try { + OperationStartResult result = + startOperation(context, operationStartDetails.build(), input.build()); + if (result.isSync()) { + startResponseBuilder.setSyncSuccess( + StartOperationResponse.Sync.newBuilder() + .setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes())) + .build()); + } else { + startResponseBuilder.setAsyncSuccess( + StartOperationResponse.Async.newBuilder() + .setOperationId(result.getAsyncOperationToken()) + .setOperationToken(result.getAsyncOperationToken()) + .addAllLinks( + context.getLinks().stream() + .map( + link -> + io.temporal.api.nexus.v1.Link.newBuilder() + .setType(link.getType()) + .setUrl(link.getUri().toString()) + .build()) + .collect(Collectors.toList())) + .build()); + } + } catch (OperationException e) { + throw e; + } catch (Throwable failure) { + convertKnownFailures(failure); } - } catch (OperationUnsuccessfulException e) { + } catch (OperationException e) { startResponseBuilder.setOperationError( UnsuccessfulOperationError.newBuilder() .setOperationState(e.getState().toString().toLowerCase()) - .setFailure(createFailure(e.getFailureInfo())) + .setFailure(exceptionToNexusFailure(e.getCause(), dataConverter)) .build()); - } catch (Throwable failure) { - convertKnownFailures(failure); } return startResponseBuilder.build(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java index 5ddada3ac8..0f8757309f 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/RootNexusOperationInboundCallsInterceptor.java @@ -20,7 +20,7 @@ package io.temporal.internal.nexus; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationStartResult; import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; @@ -40,8 +40,7 @@ public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { } @Override - public StartOperationOutput startOperation(StartOperationInput input) - throws OperationUnsuccessfulException { + public StartOperationOutput startOperation(StartOperationInput input) throws OperationException { OperationStartResult result = operationInterceptor.start( input.getOperationContext(), input.getStartDetails(), input.getInput()); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java index b9536f13c3..7ae68b79e9 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/nexus/TemporalInterceptorMiddleware.java @@ -20,8 +20,8 @@ package io.temporal.internal.nexus; +import io.nexusrpc.OperationException; import io.nexusrpc.OperationInfo; -import io.nexusrpc.OperationUnsuccessfulException; import io.nexusrpc.handler.*; import io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor; import io.temporal.common.interceptors.WorkerInterceptor; @@ -61,7 +61,7 @@ public OperationInterceptorConverter(NexusOperationInboundCallsInterceptor next) @Override public OperationStartResult start( OperationContext operationContext, OperationStartDetails operationStartDetails, Object o) - throws OperationUnsuccessfulException { + throws OperationException { return next.startOperation( new NexusOperationInboundCallsInterceptor.StartOperationInput( operationContext, operationStartDetails, o)) @@ -71,14 +71,14 @@ public OperationStartResult start( @Override public Object fetchResult( OperationContext operationContext, OperationFetchResultDetails operationFetchResultDetails) - throws OperationHandlerException { + throws OperationException { throw new UnsupportedOperationException("Not implemented"); } @Override public OperationInfo fetchInfo( OperationContext operationContext, OperationFetchInfoDetails operationFetchInfoDetails) - throws OperationHandlerException { + throws HandlerException { throw new UnsupportedOperationException("Not implemented"); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/NexusOperationStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/NexusOperationStateMachine.java index 6d2555c592..508171c2bd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/NexusOperationStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/NexusOperationStateMachine.java @@ -191,9 +191,12 @@ private void notifyStarted() { startedCallback.apply(Optional.empty(), null); } else { async = true; + String operationToken = + currentEvent.getNexusOperationStartedEventAttributes().getOperationToken(); + String operationId = + currentEvent.getNexusOperationStartedEventAttributes().getOperationId(); startedCallback.apply( - Optional.of(currentEvent.getNexusOperationStartedEventAttributes().getOperationId()), - null); + Optional.of(operationToken.isEmpty() ? operationId : operationToken), null); } } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java index cf373abfc7..f5c4d220a3 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java @@ -25,14 +25,14 @@ public class NexusOperationExecutionImpl implements NexusOperationExecution { - private final Optional operationId; + private final Optional operationToken; - public NexusOperationExecutionImpl(Optional operationId) { - this.operationId = operationId; + public NexusOperationExecutionImpl(Optional operationToken) { + this.operationToken = operationToken; } @Override - public Optional getOperationId() { - return operationId; + public Optional getOperationToken() { + return operationToken; } } diff --git a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java index 2ae330d7f8..f345ce0052 100644 --- a/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/nexus/WorkflowRunOperationImpl.java @@ -74,13 +74,14 @@ public OperationStartResult start( OperationStartResult.Builder result = OperationStartResult.newAsyncBuilder(workflowExec.getWorkflowId()); if (nexusLink != null) { - result.addLink(nexusProtoLinkToLink(nexusLink)); + ctx.addLinks(nexusProtoLinkToLink(nexusLink)); } return result.build(); } catch (URISyntaxException e) { // Not expected as the link is constructed by the SDK. - throw new OperationHandlerException( - OperationHandlerException.ErrorType.INTERNAL, "failed to construct result URL", e); + throw new HandlerException( + HandlerException.ErrorType.INTERNAL, + new IllegalArgumentException("failed to parse URI", e)); } } @@ -100,6 +101,6 @@ public OperationInfo fetchInfo( public void cancel( OperationContext operationContext, OperationCancelDetails operationCancelDetails) { WorkflowClient client = CurrentNexusOperationContext.get().getWorkflowClient(); - client.newUntypedWorkflowStub(operationCancelDetails.getOperationId()).cancel(); + client.newUntypedWorkflowStub(operationCancelDetails.getOperationToken()).cancel(); } } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationExecution.java b/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationExecution.java index ae6ba353b4..d9f28aa8e0 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationExecution.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/NexusOperationExecution.java @@ -25,8 +25,8 @@ /** NexusOperationExecution identifies a specific Nexus operation execution. */ public interface NexusOperationExecution { /** - * @return the Operation ID as set by the Operation's handler. May be empty if the operation + * @return the Operation token as set by the Operation's handler. May be empty if the operation * hasn't started yet or completed synchronously. */ - Optional getOperationId(); + Optional getOperationToken(); } diff --git a/temporal-sdk/src/test/java/io/temporal/common/converter/CodecDataConverterTest.java b/temporal-sdk/src/test/java/io/temporal/common/converter/CodecDataConverterTest.java index 7df75e1359..fb8d484bd3 100644 --- a/temporal-sdk/src/test/java/io/temporal/common/converter/CodecDataConverterTest.java +++ b/temporal-sdk/src/test/java/io/temporal/common/converter/CodecDataConverterTest.java @@ -88,7 +88,8 @@ public void testMessageAndStackTraceAreCorrectlyDecoded() { throw ApplicationFailure.newFailureWithCause("Message", "Type", causeException); } catch (ApplicationFailure originalException) { Failure failure = dataConverter.exceptionToFailure(originalException); - TemporalFailure decodedException = dataConverter.failureToException(failure); + TemporalFailure decodedException = + (TemporalFailure) dataConverter.failureToException(failure); assertEquals("Message", decodedException.getOriginalMessage()); assertEquals( diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java index e7eac1318e..eb9f515f74 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/AsyncWorkflowOperationTest.java @@ -24,6 +24,8 @@ import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; import io.temporal.client.WorkflowOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.NexusOperationFailure; import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowRunOperation; import io.temporal.testing.WorkflowReplayer; @@ -86,15 +88,33 @@ public String execute(String input) { Workflow.startNexusOperation(serviceStub::operation, "block"); NexusOperationExecution asyncExec = asyncOpHandle.getExecution().get(); // Execution id is present for an asynchronous operations - Assert.assertTrue("Operation id should be present", asyncExec.getOperationId().isPresent()); + Assert.assertTrue( + "Operation token should be present", asyncExec.getOperationToken().isPresent()); // Result should only be completed if the operation is completed Assert.assertFalse("Result should not be completed", asyncOpHandle.getResult().isCompleted()); - Assert.assertTrue(asyncExec.getOperationId().get().startsWith(WORKFLOW_ID_PREFIX)); + Assert.assertTrue(asyncExec.getOperationToken().get().startsWith(WORKFLOW_ID_PREFIX)); // Unblock the operation - Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationId().get()) + Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationToken().get()) .unblock(); // Wait for the operation to complete Assert.assertEquals("Hello from operation workflow block", asyncOpHandle.getResult().get()); + // Try to call an asynchronous operation that will fail + try { + String ignore = serviceStub.operation("fail"); + } catch (NexusOperationFailure e) { + Assert.assertEquals("TestNexusService1", e.getService()); + Assert.assertEquals("operation", e.getOperation()); + Assert.assertTrue(e.getOperationToken().startsWith(WORKFLOW_ID_PREFIX)); + Assert.assertTrue(e.getCause() instanceof ApplicationFailure); + ApplicationFailure applicationFailure = (ApplicationFailure) e.getCause(); + Assert.assertEquals("simulated failure", applicationFailure.getOriginalMessage()); + Assert.assertEquals("SimulatedFailureType", applicationFailure.getType()); + Assert.assertEquals("foo", applicationFailure.getDetails().get(String.class)); + Assert.assertTrue(applicationFailure.getCause() instanceof ApplicationFailure); + ApplicationFailure cause = (ApplicationFailure) applicationFailure.getCause(); + Assert.assertEquals("simulated cause", cause.getOriginalMessage()); + Assert.assertEquals("SimulatedCause", cause.getType()); + } return asyncResult; } } @@ -115,6 +135,18 @@ public static class TestOperationWorkflow implements OperationWorkflow { public String execute(String arg) { if (arg.equals("block")) { Workflow.await(() -> unblocked); + } else if (arg.equals("fail")) { + throw ApplicationFailure.newFailureWithCause( + "simulated failure", + "SimulatedFailureType", + ApplicationFailure.newFailure("simulated cause", "SimulatedCause"), + "foo"); + } else if (arg.equals("ignore-cancel")) { + Workflow.newDetachedCancellationScope( + () -> { + Workflow.await(() -> unblocked); + }) + .run(); } return "Hello from operation workflow " + arg; } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java deleted file mode 100644 index dc0e1cda72..0000000000 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/CancelAsyncOperationTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this material except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.temporal.workflow.nexus; - -import static org.junit.Assume.assumeFalse; - -import io.nexusrpc.handler.OperationHandler; -import io.nexusrpc.handler.OperationImpl; -import io.nexusrpc.handler.ServiceImpl; -import io.temporal.client.WorkflowFailedException; -import io.temporal.client.WorkflowOptions; -import io.temporal.failure.CanceledFailure; -import io.temporal.failure.NexusOperationFailure; -import io.temporal.nexus.Nexus; -import io.temporal.nexus.WorkflowRunOperation; -import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.testing.internal.TracingWorkerInterceptor; -import io.temporal.workflow.*; -import io.temporal.workflow.shared.TestNexusServices; -import io.temporal.workflow.shared.TestWorkflows; -import java.time.Duration; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -public class CancelAsyncOperationTest { - @Rule - public SDKTestWorkflowRule testWorkflowRule = - SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes(TestNexus.class, AsyncWorkflowOperationTest.TestOperationWorkflow.class) - .setNexusServiceImplementation(new TestNexusServiceImpl()) - .build(); - - @Before - public void checkRealServer() { - assumeFalse( - "Test flakes on real server because of delays in the Nexus Registry", - SDKTestWorkflowRule.useExternalService); - } - - @Test - public void asyncOperationImmediatelyCancelled() { - TestWorkflows.TestWorkflow1 workflowStub = - testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); - WorkflowFailedException exception = - Assert.assertThrows( - WorkflowFailedException.class, () -> workflowStub.execute("immediately")); - Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); - NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); - Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure); - CanceledFailure canceledFailure = (CanceledFailure) nexusFailure.getCause(); - Assert.assertEquals( - "operation canceled before it was started", canceledFailure.getOriginalMessage()); - - testWorkflowRule - .getInterceptor(TracingWorkerInterceptor.class) - .setExpected( - "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, - "newThread workflow-method", - "executeNexusOperation TestNexusService1 operation", - "startNexusOperation TestNexusService1 operation", - "cancelNexusOperation TestNexusService1 operation"); - } - - @Test - public void asyncOperationCancelled() { - TestWorkflows.TestWorkflow1 workflowStub = - testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); - WorkflowFailedException exception = - Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("")); - Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); - NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); - Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure); - - testWorkflowRule - .getInterceptor(TracingWorkerInterceptor.class) - .setExpected( - "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, - "newThread workflow-method", - "executeNexusOperation TestNexusService1 operation", - "startNexusOperation TestNexusService1 operation", - "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, - "registerSignalHandlers unblock", - "newThread workflow-method", - "await await", - "cancelNexusOperation TestNexusService1 operation"); - } - - public static class TestNexus implements TestWorkflows.TestWorkflow1 { - @Override - public String execute(String input) { - NexusOperationOptions options = - NexusOperationOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofSeconds(10)) - .build(); - NexusServiceOptions serviceOptions = - NexusServiceOptions.newBuilder().setOperationOptions(options).build(); - TestNexusServices.TestNexusService1 serviceStub = - Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); - Workflow.newCancellationScope( - () -> { - NexusOperationHandle handle = - Workflow.startNexusOperation(serviceStub::operation, "block"); - if (input.isEmpty()) { - handle.getExecution().get(); - } - CancellationScope.current().cancel(); - handle.getResult().get(); - }) - .run(); - return "Should not get here"; - } - } - - @ServiceImpl(service = TestNexusServices.TestNexusService1.class) - public class TestNexusServiceImpl { - @OperationImpl - public OperationHandler operation() { - return WorkflowRunOperation.fromWorkflowMethod( - (context, details, input) -> - Nexus.getOperationContext() - .getWorkflowClient() - .newWorkflowStub( - AsyncWorkflowOperationTest.OperationWorkflow.class, - WorkflowOptions.newBuilder() - .setWorkflowId(details.getRequestId()) - .build()) - ::execute); - } - } -} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java index 3b25c45bd1..544daed8e4 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailMetricTest.java @@ -24,9 +24,9 @@ import com.google.common.collect.ImmutableMap; import com.uber.m3.tally.RootScopeBuilder; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; +import io.nexusrpc.handler.HandlerException; import io.nexusrpc.handler.OperationHandler; -import io.nexusrpc.handler.OperationHandlerException; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; import io.temporal.api.common.v1.WorkflowExecution; @@ -34,6 +34,7 @@ import io.temporal.client.WorkflowFailedException; import io.temporal.common.reporter.TestStatsReporter; import io.temporal.failure.ApplicationFailure; +import io.temporal.failure.NexusOperationFailure; import io.temporal.serviceclient.MetricsTag; import io.temporal.testUtils.Eventually; import io.temporal.testing.internal.SDKTestWorkflowRule; @@ -46,6 +47,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.junit.Assert; +import org.junit.Assume; import org.junit.Rule; import org.junit.Test; @@ -61,6 +63,7 @@ public class OperationFailMetricTest { new RootScopeBuilder() .reporter(reporter) .reportEvery(com.uber.m3.util.Duration.ofMillis(10))) + .setUseExternalService(false) .build(); private ImmutableMap.Builder getBaseTags() { @@ -76,12 +79,59 @@ private ImmutableMap.Builder getOperationTags() { .put(MetricsTag.NEXUS_OPERATION, "operation"); } + private T assertNexusOperationFailure( + Class expectedCause, WorkflowFailedException workflowException) { + Assert.assertTrue(workflowException.getCause() instanceof NexusOperationFailure); + NexusOperationFailure nexusOperationFailure = + (NexusOperationFailure) workflowException.getCause(); + Assert.assertEquals( + testWorkflowRule.getNexusEndpoint().getSpec().getName(), + nexusOperationFailure.getEndpoint()); + Assert.assertEquals("TestNexusService1", nexusOperationFailure.getService()); + Assert.assertEquals("operation", nexusOperationFailure.getOperation()); + Assert.assertEquals("", nexusOperationFailure.getOperationToken()); + Assert.assertTrue(expectedCause.isInstance(nexusOperationFailure.getCause())); + return expectedCause.cast(nexusOperationFailure.getCause()); + } + @Test public void failOperationMetrics() { TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("fail")); + WorkflowFailedException workflowException = + Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("fail")); + ApplicationFailure applicationFailure = + assertNexusOperationFailure(ApplicationFailure.class, workflowException); + Assert.assertEquals("intentional failure", applicationFailure.getOriginalMessage()); + + Map execFailedTags = + getOperationTags().put(MetricsTag.TASK_FAILURE_TYPE, "operation_failed").buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + + @Test + public void failOperationApplicationErrorMetrics() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + + WorkflowFailedException workflowException = + Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("fail-app")); + ApplicationFailure applicationFailure = + assertNexusOperationFailure(ApplicationFailure.class, workflowException); + Assert.assertEquals("intentional failure", applicationFailure.getOriginalMessage()); + Assert.assertEquals("TestFailure", applicationFailure.getType()); + Assert.assertEquals("foo", applicationFailure.getDetails().get(String.class)); Map execFailedTags = getOperationTags().put(MetricsTag.TASK_FAILURE_TYPE, "operation_failed").buildKeepingLast(); @@ -102,7 +152,14 @@ public void failOperationMetrics() { public void failHandlerBadRequestMetrics() { TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("handlererror")); + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("handlererror")); + HandlerException handlerException = + assertNexusOperationFailure(HandlerException.class, workflowException); + Assert.assertTrue(handlerException.getCause() instanceof ApplicationFailure); + ApplicationFailure applicationFailure = (ApplicationFailure) handlerException.getCause(); + Assert.assertEquals("handlererror", applicationFailure.getOriginalMessage()); Map execFailedTags = getOperationTags() @@ -122,11 +179,19 @@ public void failHandlerBadRequestMetrics() { } @Test - public void failHandlerAlreadyStartedMetrics() { + public void failHandlerAppBadRequestMetrics() { TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - Assert.assertThrows( - WorkflowFailedException.class, () -> workflowStub.execute("already-started")); + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("handlererror-app")); + HandlerException handlerException = + assertNexusOperationFailure(HandlerException.class, workflowException); + Assert.assertTrue(handlerException.getCause() instanceof ApplicationFailure); + ApplicationFailure applicationFailure = (ApplicationFailure) handlerException.getCause(); + Assert.assertEquals("intentional failure", applicationFailure.getOriginalMessage()); + Assert.assertEquals("TestFailure", applicationFailure.getType()); + Assert.assertEquals("foo", applicationFailure.getDetails().get(String.class)); Map execFailedTags = getOperationTags() @@ -145,6 +210,34 @@ public void failHandlerAlreadyStartedMetrics() { }); } + @Test + public void failHandlerAlreadyStartedMetrics() { + Assume.assumeFalse("skipping", true); + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("already-started")); + ApplicationFailure applicationFailure = + assertNexusOperationFailure(ApplicationFailure.class, workflowException); + Assert.assertEquals( + "io.temporal.client.WorkflowExecutionAlreadyStarted", applicationFailure.getType()); + + Map execFailedTags = + getOperationTags().put(MetricsTag.TASK_FAILURE_TYPE, "operation_failed").buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + @Test public void failHandlerRetryableApplicationFailureMetrics() { TestWorkflow1 workflowStub = @@ -174,9 +267,14 @@ public void failHandlerRetryableApplicationFailureMetrics() { public void failHandlerNonRetryableApplicationFailureMetrics() { TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - Assert.assertThrows( - WorkflowFailedException.class, - () -> workflowStub.execute("non-retryable-application-failure")); + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, + () -> workflowStub.execute("non-retryable-application-failure")); + HandlerException handlerFailure = + assertNexusOperationFailure(HandlerException.class, workflowException); + Assert.assertTrue(handlerFailure.getMessage().contains("intentional failure")); + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, handlerFailure.getErrorType()); Map execFailedTags = getOperationTags() @@ -218,7 +316,9 @@ public void failHandlerSleepMetrics() throws InterruptedException { public void failHandlerErrorMetrics() { TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("error")); + WorkflowFailedException workflowException = + Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("error")); + Map execFailedTags = getOperationTags() .put(MetricsTag.TASK_FAILURE_TYPE, "handler_error_INTERNAL") @@ -237,6 +337,31 @@ public void failHandlerErrorMetrics() { }); } + @Test + public void handlerErrorNonRetryableMetrics() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + WorkflowFailedException workflowException = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("handlererror-nonretryable")); + + Map execFailedTags = + getOperationTags() + .put(MetricsTag.TASK_FAILURE_TYPE, "handler_error_INTERNAL") + .buildKeepingLast(); + Eventually.assertEventually( + Duration.ofSeconds(3), + () -> { + reporter.assertTimer( + MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, getBaseTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_EXEC_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertTimer( + MetricsType.NEXUS_TASK_E2E_LATENCY, getOperationTags().buildKeepingLast()); + reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + }); + } + public static class TestNexus implements TestWorkflow1 { @Override public String execute(String operation) { @@ -266,23 +391,35 @@ public OperationHandler operation() { details.getRequestId(), invocationCount.getOrDefault(details.getRequestId(), 0) + 1); if (invocationCount.get(details.getRequestId()) > 1) { - throw new OperationUnsuccessfulException("exceeded invocation count"); + throw OperationException.failure(new RuntimeException("exceeded invocation count")); } switch (operation) { case "success": return operation; case "fail": - throw new OperationUnsuccessfulException("fail"); + throw OperationException.failure(new RuntimeException("intentional failure")); + case "fail-app": + throw OperationException.failure( + ApplicationFailure.newFailure("intentional failure", "TestFailure", "foo")); case "handlererror": - throw new OperationHandlerException( - OperationHandlerException.ErrorType.BAD_REQUEST, "handlererror"); + throw new HandlerException(HandlerException.ErrorType.BAD_REQUEST, "handlererror"); + case "handlererror-app": + throw new HandlerException( + HandlerException.ErrorType.BAD_REQUEST, + ApplicationFailure.newFailure("intentional failure", "TestFailure", "foo")); + case "handlererror-nonretryable": + throw new HandlerException( + HandlerException.ErrorType.INTERNAL, + ApplicationFailure.newNonRetryableFailure("intentional failure", "TestFailure"), + HandlerException.RetryBehavior.NON_RETRYABLE); case "already-started": throw new WorkflowExecutionAlreadyStarted( WorkflowExecution.getDefaultInstance(), "TestWorkflowType", null); case "retryable-application-failure": - throw ApplicationFailure.newFailure("fail", "TestFailure"); + throw ApplicationFailure.newFailure("intentional failure", "TestFailure"); case "non-retryable-application-failure": - throw ApplicationFailure.newNonRetryableFailure("fail", "TestFailure"); + throw ApplicationFailure.newNonRetryableFailure( + "intentional failure", "TestFailure", "foo"); case "sleep": try { Thread.sleep(11000); @@ -292,6 +429,8 @@ public OperationHandler operation() { return operation; case "error": throw new Error("error"); + case "canceled": + throw OperationException.canceled(new RuntimeException("canceled")); default: // Should never happen Assert.fail(); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java index 9a1717a496..6d76704e17 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/OperationFailureConversionTest.java @@ -20,11 +20,10 @@ package io.temporal.workflow.nexus; +import io.nexusrpc.handler.HandlerException; import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; -import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.client.WorkflowExecutionAlreadyStarted; import io.temporal.client.WorkflowFailedException; import io.temporal.failure.ApplicationFailure; import io.temporal.failure.NexusOperationFailure; @@ -58,20 +57,10 @@ public void nexusOperationApplicationFailureNonRetryableFailureConversion() { () -> workflowStub.execute("ApplicationFailureNonRetryable")); Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); - Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure); - } - - @Test - public void nexusOperationWorkflowExecutionAlreadyStartedFailureConversion() { - TestWorkflow1 workflowStub = - testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - WorkflowFailedException exception = - Assert.assertThrows( - WorkflowFailedException.class, - () -> workflowStub.execute("WorkflowExecutionAlreadyStarted")); - Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); - NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); - Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure); + Assert.assertTrue(nexusFailure.getCause() instanceof HandlerException); + HandlerException handlerException = (HandlerException) nexusFailure.getCause(); + Assert.assertTrue(handlerException.getMessage().contains("failed to call operation")); + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, handlerException.getErrorType()); } @Test @@ -83,10 +72,10 @@ public void nexusOperationApplicationFailureFailureConversion() { WorkflowFailedException.class, () -> workflowStub.execute("ApplicationFailure")); Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); - Assert.assertTrue(nexusFailure.getCause() instanceof ApplicationFailure); - ApplicationFailure applicationFailure = (ApplicationFailure) nexusFailure.getCause(); - Assert.assertTrue( - applicationFailure.getOriginalMessage().contains("exceeded invocation count")); + Assert.assertTrue(nexusFailure.getCause() instanceof HandlerException); + HandlerException handlerFailure = (HandlerException) nexusFailure.getCause(); + Assert.assertTrue(handlerFailure.getMessage().contains("exceeded invocation count")); + Assert.assertEquals(HandlerException.ErrorType.BAD_REQUEST, handlerFailure.getErrorType()); } public static class TestNexus implements TestWorkflow1 { @@ -127,11 +116,6 @@ public OperationHandler operation() { } else if (name.equals("ApplicationFailureNonRetryable")) { throw ApplicationFailure.newNonRetryableFailure( "failed to call operation", "TestFailure"); - } else if (name.equals("WorkflowExecutionAlreadyStarted")) { - throw new WorkflowExecutionAlreadyStarted( - WorkflowExecution.newBuilder().setWorkflowId("id").setRunId("runId").build(), - "TestWorkflow", - new RuntimeException("already started")); } Assert.fail(); return "fail"; diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java index 5c34d05a1c..3b1e675f6a 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationCancelledTest.java @@ -20,6 +20,7 @@ package io.temporal.workflow.nexus; +import io.nexusrpc.OperationException; import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; @@ -58,6 +59,20 @@ public void syncOperationImmediatelyCancelled() { "operation canceled before it was started", canceledFailure.getOriginalMessage()); } + @Test + public void syncOperationCanceledInStartHandler() { + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class); + WorkflowFailedException exception = + Assert.assertThrows( + WorkflowFailedException.class, () -> workflowStub.execute("cancel-in-handler")); + Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); + NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); + Assert.assertTrue(nexusFailure.getCause() instanceof CanceledFailure); + CanceledFailure canceledFailure = (CanceledFailure) nexusFailure.getCause(); + Assert.assertEquals("operation canceled in handler", canceledFailure.getOriginalMessage()); + } + public static class TestNexus implements TestWorkflows.TestWorkflow1 { @Override public String execute(String input) { @@ -71,11 +86,13 @@ public String execute(String input) { Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); Workflow.newCancellationScope( () -> { - Promise promise = Async.function(serviceStub::operation, "to be cancelled"); - if (input.isEmpty()) { + Promise promise = Async.function(serviceStub::operation, input); + if (!input.equals("immediately")) { Workflow.sleep(Duration.ofSeconds(1)); } - CancellationScope.current().cancel(); + if (!input.equals("cancel-in-handler")) { + CancellationScope.current().cancel(); + } promise.get(); }) .run(); @@ -89,7 +106,11 @@ public class TestNexusServiceImpl { public OperationHandler operation() { // Implemented inline return OperationHandler.sync( - (ctx, details, name) -> { + (ctx, details, input) -> { + if (input.equals("cancel-in-handler")) { + throw OperationException.canceled( + new RuntimeException("operation canceled in handler")); + } throw new RuntimeException("failed to call operation"); }); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java index be79bbb8b5..eaf2dacacd 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationFailTest.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.uber.m3.tally.RootScopeBuilder; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; @@ -149,7 +149,7 @@ public OperationHandler operation() { // Implemented inline return OperationHandler.sync( (ctx, details, name) -> { - throw new OperationUnsuccessfulException("failed to call operation"); + throw OperationException.failure(new RuntimeException("failed to call operation")); }); } } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java index cce9ecc5a8..b1aeddfd2f 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationStubTest.java @@ -71,7 +71,7 @@ public String execute(String input) { NexusOperationExecution syncExec = syncOpHandle.getExecution().get(); // Execution id is not present for synchronous operations Assert.assertFalse( - "Operation id should not be present", syncExec.getOperationId().isPresent()); + "Operation token should not be present", syncExec.getOperationToken().isPresent()); // Result should always be completed for a synchronous operations when the Execution // is resolved Assert.assertTrue("Result should be completed", syncOpHandle.getResult().isCompleted()); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java index a7bef6bf1f..19563b7729 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/TerminateWorkflowAsyncOperationTest.java @@ -27,7 +27,6 @@ import io.nexusrpc.handler.ServiceImpl; import io.temporal.client.WorkflowFailedException; import io.temporal.client.WorkflowOptions; -import io.temporal.failure.ApplicationFailure; import io.temporal.failure.NexusOperationFailure; import io.temporal.failure.TerminatedFailure; import io.temporal.nexus.Nexus; @@ -56,20 +55,9 @@ public void terminateAsyncOperation() { Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute("")); Assert.assertTrue(exception.getCause() instanceof NexusOperationFailure); NexusOperationFailure nexusFailure = (NexusOperationFailure) exception.getCause(); - // TODO(https://github.com/temporalio/sdk-java/issues/2358): Test server needs to be fixed to - // return the correct type - Assert.assertTrue( - nexusFailure.getCause() instanceof ApplicationFailure - || nexusFailure.getCause() instanceof TerminatedFailure); - if (nexusFailure.getCause() instanceof ApplicationFailure) { - Assert.assertEquals( - "operation terminated", - ((ApplicationFailure) nexusFailure.getCause()).getOriginalMessage()); - } else { - Assert.assertEquals( - "operation terminated", - ((TerminatedFailure) nexusFailure.getCause()).getOriginalMessage()); - } + Assert.assertTrue(nexusFailure.getCause() instanceof TerminatedFailure); + Assert.assertEquals( + "operation terminated", ((TerminatedFailure) nexusFailure.getCause()).getOriginalMessage()); } @Service @@ -96,7 +84,7 @@ public String execute(String input) { NexusOperationHandle handle = Workflow.startNexusOperation(serviceStub::operation, "block"); // Wait for the operation to start - String workflowId = handle.getExecution().get().getOperationId().get(); + String workflowId = handle.getExecution().get().getOperationToken().get(); // Terminate the operation serviceStub.terminate(workflowId); // Try to get the result, expect this to throw diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java index 27340fbec2..e53c371aed 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/UntypedSyncOperationStubTest.java @@ -71,7 +71,7 @@ public String execute(String name) { serviceStub.start("operation", String.class, name); NexusOperationExecution syncOpExec = syncOpHandle.getExecution().get(); // Execution id is not present for synchronous operations - if (syncOpExec.getOperationId().isPresent()) { + if (syncOpExec.getOperationToken().isPresent()) { Assert.fail("Execution id is present"); } // Result should always be completed for a synchronous operations when the Execution promise diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java index a9b085a26a..add42a7cc7 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/WorkflowOperationLinkingTest.java @@ -103,7 +103,7 @@ public String execute(String input) { // Signal the operation to unblock, this makes sure the operation doesn't complete before the // operation // started event is written to history - Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationId().get()) + Workflow.newExternalWorkflowStub(OperationWorkflow.class, asyncExec.getOperationToken().get()) .unblock(); return asyncOpHandle.getResult().get(); } diff --git a/temporal-sdk/src/test/resources/testAsyncWorkflowOperationTestHistory.json b/temporal-sdk/src/test/resources/testAsyncWorkflowOperationTestHistory.json index 8362354566..93c35f274a 100644 --- a/temporal-sdk/src/test/resources/testAsyncWorkflowOperationTestHistory.json +++ b/temporal-sdk/src/test/resources/testAsyncWorkflowOperationTestHistory.json @@ -2,15 +2,15 @@ "events": [ { "eventId": "1", - "eventTime": "2024-11-08T21:52:52.621222Z", + "eventTime": "2025-01-13T18:44:53.631719593Z", "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", - "taskId": "1049457", + "taskId": "1051840", "workflowExecutionStartedEventAttributes": { "workflowType": { "name": "TestWorkflow1" }, "taskQueue": { - "name": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99", + "name": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f", "kind": "TASK_QUEUE_KIND_NORMAL" }, "input": { @@ -19,30 +19,30 @@ "metadata": { "encoding": "anNvbi9wbGFpbg==" }, - "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tNjEyMzUwNjQtZGZmZi00YTc5LThkNmItYTU0MDc3NmM4YTk5Ig==" + "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tMDNkM2JlNWEtN2RjNS00ZDVlLWJhNmEtZGFjZTE2NTMzYTZmIg==" } ] }, "workflowExecutionTimeout": "0s", "workflowRunTimeout": "200s", "workflowTaskTimeout": "5s", - "originalExecutionRunId": "218b05cf-6fc4-4855-9ff1-f4b27e118114", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "firstExecutionRunId": "218b05cf-6fc4-4855-9ff1-f4b27e118114", + "originalExecutionRunId": "95d51fae-165d-4763-b4d7-899754cc6fb6", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "firstExecutionRunId": "95d51fae-165d-4763-b4d7-899754cc6fb6", "attempt": 1, "firstWorkflowTaskBackoff": "0s", "header": {}, - "workflowId": "76d7c4d0-9afe-46f0-af8a-0328b30a8438" + "workflowId": "db5c83db-aec8-4fe4-9b3b-d2743593147f" } }, { "eventId": "2", - "eventTime": "2024-11-08T21:52:52.621331Z", + "eventTime": "2025-01-13T18:44:53.631800302Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049458", + "taskId": "1051841", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99", + "name": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f", "kind": "TASK_QUEUE_KIND_NORMAL" }, "startToCloseTimeout": "5s", @@ -51,25 +51,25 @@ }, { "eventId": "3", - "eventTime": "2024-11-08T21:52:52.624710Z", + "eventTime": "2025-01-13T18:44:53.640801427Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049464", + "taskId": "1051847", "workflowTaskStartedEventAttributes": { "scheduledEventId": "2", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "e9ae8f8b-15a0-47d3-87d7-fd5f31f156fe", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "9bc3e2b9-3b31-426d-a18c-ab89a6849f72", "historySizeBytes": "510" } }, { "eventId": "4", - "eventTime": "2024-11-08T21:52:52.714850Z", + "eventTime": "2025-01-13T18:44:53.745569135Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049468", + "taskId": "1051851", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "2", "startedEventId": "3", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "sdkMetadata": { "langUsedFlags": [ @@ -81,36 +81,36 @@ }, { "eventId": "5", - "eventTime": "2024-11-08T21:52:52.714937Z", + "eventTime": "2025-01-13T18:44:53.745616635Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", - "taskId": "1049469", + "taskId": "1051852", "nexusOperationScheduledEventAttributes": { - "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99", + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f", "service": "TestNexusService1", "operation": "operation", "input": { "metadata": { "encoding": "anNvbi9wbGFpbg==" }, - "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tNjEyMzUwNjQtZGZmZi00YTc5LThkNmItYTU0MDc3NmM4YTk5Ig==" + "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tMDNkM2JlNWEtN2RjNS00ZDVlLWJhNmEtZGFjZTE2NTMzYTZmIg==" }, "scheduleToCloseTimeout": "200s", "workflowTaskCompletedEventId": "4", - "requestId": "7e3f6507-ee51-447b-b241-eed2516ec5d1", - "endpointId": "6e04916c-9d6e-4736-9ff5-a0db60b1ea33" + "requestId": "68d7908c-f478-4f6e-907a-9d8d40145309", + "endpointId": "160b86b3-173f-4a31-a3ba-3f85f2f60e49" } }, { "eventId": "6", - "eventTime": "2024-11-08T21:52:52.749777Z", + "eventTime": "2025-01-13T18:44:53.790009344Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", - "taskId": "1049484", + "taskId": "1051857", "links": [ { "workflowEvent": { "namespace": "UnitTest", - "workflowId": "test-prefix7e3f6507-ee51-447b-b241-eed2516ec5d1", - "runId": "3ba25896-55f5-400d-bc09-a0dcaab32bf3", + "workflowId": "test-prefix68d7908c-f478-4f6e-907a-9d8d40145309", + "runId": "c55d43ef-16f3-4e66-b35c-f16729a23e80", "eventRef": { "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED" } @@ -119,20 +119,20 @@ ], "nexusOperationStartedEventAttributes": { "scheduledEventId": "5", - "operationId": "test-prefix7e3f6507-ee51-447b-b241-eed2516ec5d1", - "requestId": "7e3f6507-ee51-447b-b241-eed2516ec5d1" + "operationId": "test-prefix68d7908c-f478-4f6e-907a-9d8d40145309", + "requestId": "68d7908c-f478-4f6e-907a-9d8d40145309" } }, { "eventId": "7", - "eventTime": "2024-11-08T21:52:52.749815Z", + "eventTime": "2025-01-13T18:44:53.790034510Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049485", + "taskId": "1051858", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "12998@Quinn-Klassens-MacBook-Pro.local:fa5cdd42-a3a4-4432-a16d-2f542f40c458", + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", "kind": "TASK_QUEUE_KIND_STICKY", - "normalName": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99" + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" }, "startToCloseTimeout": "5s", "attempt": 1 @@ -140,55 +140,55 @@ }, { "eventId": "8", - "eventTime": "2024-11-08T21:52:52.751464Z", + "eventTime": "2025-01-13T18:44:53.792979969Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049489", + "taskId": "1051862", "workflowTaskStartedEventAttributes": { "scheduledEventId": "7", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "6e8bc95e-46eb-4966-a2d5-a6766f036e48", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "3be50792-f088-4b5f-8dcf-9f132b0ee9db", "historySizeBytes": "1441" } }, { "eventId": "9", - "eventTime": "2024-11-08T21:52:52.756929Z", + "eventTime": "2025-01-13T18:44:53.801908844Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049500", + "taskId": "1051866", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "7", "startedEventId": "8", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "meteringMetadata": {} } }, { "eventId": "10", - "eventTime": "2024-11-08T21:52:52.753879Z", + "eventTime": "2025-01-13T18:44:53.798324760Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", - "taskId": "1049501", + "taskId": "1051867", "nexusOperationCompletedEventAttributes": { "scheduledEventId": "5", "result": { "metadata": { "encoding": "anNvbi9wbGFpbg==" }, - "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tNjEyMzUwNjQtZGZmZi00YTc5LThkNmItYTU0MDc3NmM4YTk5Ig==" + "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tMDNkM2JlNWEtN2RjNS00ZDVlLWJhNmEtZGFjZTE2NTMzYTZmIg==" }, - "requestId": "7e3f6507-ee51-447b-b241-eed2516ec5d1" + "requestId": "68d7908c-f478-4f6e-907a-9d8d40145309" } }, { "eventId": "11", - "eventTime": "2024-11-08T21:52:52.756942Z", + "eventTime": "2025-01-13T18:44:53.801925052Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049502", + "taskId": "1051868", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "12998@Quinn-Klassens-MacBook-Pro.local:fa5cdd42-a3a4-4432-a16d-2f542f40c458", + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", "kind": "TASK_QUEUE_KIND_STICKY", - "normalName": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99" + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" }, "startToCloseTimeout": "5s", "attempt": 1 @@ -196,61 +196,61 @@ }, { "eventId": "12", - "eventTime": "2024-11-08T21:52:52.757765Z", + "eventTime": "2025-01-13T18:44:53.805019802Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049506", + "taskId": "1051872", "workflowTaskStartedEventAttributes": { "scheduledEventId": "11", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "5a76a21a-2afe-498f-a600-f7de1b4fb043", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "21e966cd-315e-43b7-be8f-af3fe3340e8b", "historySizeBytes": "2014" } }, { "eventId": "13", - "eventTime": "2024-11-08T21:52:52.763136Z", + "eventTime": "2025-01-13T18:44:53.813525052Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049510", + "taskId": "1051876", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "11", "startedEventId": "12", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "meteringMetadata": {} } }, { "eventId": "14", - "eventTime": "2024-11-08T21:52:52.763161Z", + "eventTime": "2025-01-13T18:44:53.813548010Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", - "taskId": "1049511", + "taskId": "1051877", "nexusOperationScheduledEventAttributes": { - "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99", + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f", "service": "TestNexusService1", "operation": "operation", "input": { "metadata": { "encoding": "anNvbi9wbGFpbg==" }, - "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tNjEyMzUwNjQtZGZmZi00YTc5LThkNmItYTU0MDc3NmM4YTk5Ig==" + "data": "IldvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tMDNkM2JlNWEtN2RjNS00ZDVlLWJhNmEtZGFjZTE2NTMzYTZmIg==" }, "scheduleToCloseTimeout": "200s", "workflowTaskCompletedEventId": "13", - "requestId": "2f35f6bc-7c57-4594-a338-568a5ebb0995", - "endpointId": "6e04916c-9d6e-4736-9ff5-a0db60b1ea33" + "requestId": "8aed6b6f-1012-43d3-a4ad-782a4f35d6f9", + "endpointId": "160b86b3-173f-4a31-a3ba-3f85f2f60e49" } }, { "eventId": "15", - "eventTime": "2024-11-08T21:52:52.767904Z", + "eventTime": "2025-01-13T18:44:53.824807219Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", - "taskId": "1049524", + "taskId": "1051886", "links": [ { "workflowEvent": { "namespace": "UnitTest", - "workflowId": "test-prefix2f35f6bc-7c57-4594-a338-568a5ebb0995", - "runId": "102624ec-eb6d-4fa0-8cfb-96c7ecfaca68", + "workflowId": "test-prefix8aed6b6f-1012-43d3-a4ad-782a4f35d6f9", + "runId": "254f6cda-1ef9-4836-9ed4-0bab4b69c32e", "eventRef": { "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED" } @@ -259,20 +259,20 @@ ], "nexusOperationStartedEventAttributes": { "scheduledEventId": "14", - "operationId": "test-prefix2f35f6bc-7c57-4594-a338-568a5ebb0995", - "requestId": "2f35f6bc-7c57-4594-a338-568a5ebb0995" + "operationId": "test-prefix8aed6b6f-1012-43d3-a4ad-782a4f35d6f9", + "requestId": "8aed6b6f-1012-43d3-a4ad-782a4f35d6f9" } }, { "eventId": "16", - "eventTime": "2024-11-08T21:52:52.767918Z", + "eventTime": "2025-01-13T18:44:53.824828469Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049525", + "taskId": "1051887", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "12998@Quinn-Klassens-MacBook-Pro.local:fa5cdd42-a3a4-4432-a16d-2f542f40c458", + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", "kind": "TASK_QUEUE_KIND_STICKY", - "normalName": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99" + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" }, "startToCloseTimeout": "5s", "attempt": 1 @@ -280,55 +280,55 @@ }, { "eventId": "17", - "eventTime": "2024-11-08T21:52:52.768524Z", + "eventTime": "2025-01-13T18:44:53.827101802Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049529", + "taskId": "1051894", "workflowTaskStartedEventAttributes": { "scheduledEventId": "16", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "d5ba928f-d97e-4858-a208-433129c7fe14", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "bae8c422-e1d4-44a6-a8a8-fd50fcfce66e", "historySizeBytes": "2940" } }, { "eventId": "18", - "eventTime": "2024-11-08T21:52:52.770895Z", + "eventTime": "2025-01-13T18:44:53.833686844Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049540", + "taskId": "1051906", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "16", "startedEventId": "17", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "meteringMetadata": {} } }, { "eventId": "19", - "eventTime": "2024-11-08T21:52:52.771705Z", + "eventTime": "2025-01-13T18:44:53.838903219Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", - "taskId": "1049542", + "taskId": "1051908", "nexusOperationCompletedEventAttributes": { "scheduledEventId": "14", "result": { "metadata": { "encoding": "anNvbi9wbGFpbg==" }, - "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tNjEyMzUwNjQtZGZmZi00YTc5LThkNmItYTU0MDc3NmM4YTk5Ig==" + "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tMDNkM2JlNWEtN2RjNS00ZDVlLWJhNmEtZGFjZTE2NTMzYTZmIg==" }, - "requestId": "2f35f6bc-7c57-4594-a338-568a5ebb0995" + "requestId": "8aed6b6f-1012-43d3-a4ad-782a4f35d6f9" } }, { "eventId": "20", - "eventTime": "2024-11-08T21:52:52.771717Z", + "eventTime": "2025-01-13T18:44:53.838918927Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049543", + "taskId": "1051909", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "12998@Quinn-Klassens-MacBook-Pro.local:fa5cdd42-a3a4-4432-a16d-2f542f40c458", + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", "kind": "TASK_QUEUE_KIND_STICKY", - "normalName": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99" + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" }, "startToCloseTimeout": "5s", "attempt": 1 @@ -336,36 +336,36 @@ }, { "eventId": "21", - "eventTime": "2024-11-08T21:52:52.772532Z", + "eventTime": "2025-01-13T18:44:53.841440677Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049547", + "taskId": "1051913", "workflowTaskStartedEventAttributes": { "scheduledEventId": "20", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "0082e582-3175-4c09-aff6-497554b2e944", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "e49427c6-c07e-4cc4-ac9d-8eb1e9f1577a", "historySizeBytes": "3513" } }, { "eventId": "22", - "eventTime": "2024-11-08T21:52:52.775368Z", + "eventTime": "2025-01-13T18:44:53.847943469Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049551", + "taskId": "1051917", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "20", "startedEventId": "21", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "meteringMetadata": {} } }, { "eventId": "23", - "eventTime": "2024-11-08T21:52:52.775388Z", + "eventTime": "2025-01-13T18:44:53.847966344Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", - "taskId": "1049552", + "taskId": "1051918", "nexusOperationScheduledEventAttributes": { - "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99", + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f", "service": "TestNexusService1", "operation": "operation", "input": { @@ -376,21 +376,21 @@ }, "scheduleToCloseTimeout": "200s", "workflowTaskCompletedEventId": "22", - "requestId": "9e944268-f10e-443f-adf4-8e4cb61b04c8", - "endpointId": "6e04916c-9d6e-4736-9ff5-a0db60b1ea33" + "requestId": "c1b35560-781c-4cf7-9c05-429c5153b43b", + "endpointId": "160b86b3-173f-4a31-a3ba-3f85f2f60e49" } }, { "eventId": "24", - "eventTime": "2024-11-08T21:52:52.779850Z", + "eventTime": "2025-01-13T18:44:53.858756677Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", - "taskId": "1049565", + "taskId": "1051921", "links": [ { "workflowEvent": { "namespace": "UnitTest", - "workflowId": "test-prefix9e944268-f10e-443f-adf4-8e4cb61b04c8", - "runId": "e59938c7-cdae-41ea-8edb-279e79a2812d", + "workflowId": "test-prefixc1b35560-781c-4cf7-9c05-429c5153b43b", + "runId": "8649442c-1df5-4d8f-95f2-097f47024fd0", "eventRef": { "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED" } @@ -399,20 +399,20 @@ ], "nexusOperationStartedEventAttributes": { "scheduledEventId": "23", - "operationId": "test-prefix9e944268-f10e-443f-adf4-8e4cb61b04c8", - "requestId": "9e944268-f10e-443f-adf4-8e4cb61b04c8" + "operationId": "test-prefixc1b35560-781c-4cf7-9c05-429c5153b43b", + "requestId": "c1b35560-781c-4cf7-9c05-429c5153b43b" } }, { "eventId": "25", - "eventTime": "2024-11-08T21:52:52.779881Z", + "eventTime": "2025-01-13T18:44:53.858780510Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049566", + "taskId": "1051922", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "12998@Quinn-Klassens-MacBook-Pro.local:fa5cdd42-a3a4-4432-a16d-2f542f40c458", + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", "kind": "TASK_QUEUE_KIND_STICKY", - "normalName": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99" + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" }, "startToCloseTimeout": "5s", "attempt": 1 @@ -420,39 +420,39 @@ }, { "eventId": "26", - "eventTime": "2024-11-08T21:52:52.780474Z", + "eventTime": "2025-01-13T18:44:53.861452260Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049570", + "taskId": "1051926", "workflowTaskStartedEventAttributes": { "scheduledEventId": "25", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "9fe9e7f6-92cc-4579-b303-c8e31ebb7999", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "afb917cd-cccf-4fd5-aedb-36a396696927", "historySizeBytes": "4373" } }, { "eventId": "27", - "eventTime": "2024-11-08T21:52:52.788056Z", + "eventTime": "2025-01-13T18:44:53.872756510Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049577", + "taskId": "1051930", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "25", "startedEventId": "26", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "meteringMetadata": {} } }, { "eventId": "28", - "eventTime": "2024-11-08T21:52:52.788098Z", + "eventTime": "2025-01-13T18:44:53.872783719Z", "eventType": "EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED", - "taskId": "1049578", + "taskId": "1051931", "signalExternalWorkflowExecutionInitiatedEventAttributes": { "workflowTaskCompletedEventId": "27", - "namespaceId": "be855300-b554-4000-a9e1-7ee869ffbae4", + "namespaceId": "5b6fc33e-1438-415c-af82-23d9925f361a", "workflowExecution": { - "workflowId": "test-prefix9e944268-f10e-443f-adf4-8e4cb61b04c8" + "workflowId": "test-prefixc1b35560-781c-4cf7-9c05-429c5153b43b" }, "signalName": "unblock", "header": {} @@ -460,28 +460,28 @@ }, { "eventId": "29", - "eventTime": "2024-11-08T21:52:52.789851Z", + "eventTime": "2025-01-13T18:44:53.878052760Z", "eventType": "EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED", - "taskId": "1049586", + "taskId": "1051934", "externalWorkflowExecutionSignaledEventAttributes": { "initiatedEventId": "28", "namespace": "UnitTest", - "namespaceId": "be855300-b554-4000-a9e1-7ee869ffbae4", + "namespaceId": "5b6fc33e-1438-415c-af82-23d9925f361a", "workflowExecution": { - "workflowId": "test-prefix9e944268-f10e-443f-adf4-8e4cb61b04c8" + "workflowId": "test-prefixc1b35560-781c-4cf7-9c05-429c5153b43b" } } }, { "eventId": "30", - "eventTime": "2024-11-08T21:52:52.789853Z", + "eventTime": "2025-01-13T18:44:53.878056135Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049587", + "taskId": "1051935", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "12998@Quinn-Klassens-MacBook-Pro.local:fa5cdd42-a3a4-4432-a16d-2f542f40c458", + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", "kind": "TASK_QUEUE_KIND_STICKY", - "normalName": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99" + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" }, "startToCloseTimeout": "5s", "attempt": 1 @@ -489,34 +489,34 @@ }, { "eventId": "31", - "eventTime": "2024-11-08T21:52:52.790401Z", + "eventTime": "2025-01-13T18:44:53.881553635Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049594", + "taskId": "1051939", "workflowTaskStartedEventAttributes": { "scheduledEventId": "30", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "4799907f-a6c8-4fe9-9a94-cdb9b173deb4", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "3c3d26ce-53bf-418c-9231-701ac85174ba", "historySizeBytes": "5002" } }, { "eventId": "32", - "eventTime": "2024-11-08T21:52:52.794177Z", + "eventTime": "2025-01-13T18:44:53.888379219Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049599", + "taskId": "1051943", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "30", "startedEventId": "31", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "meteringMetadata": {} } }, { "eventId": "33", - "eventTime": "2024-11-08T21:52:52.796208Z", + "eventTime": "2025-01-13T18:44:53.892486510Z", "eventType": "EVENT_TYPE_NEXUS_OPERATION_COMPLETED", - "taskId": "1049608", + "taskId": "1051945", "nexusOperationCompletedEventAttributes": { "scheduledEventId": "23", "result": { @@ -525,19 +525,19 @@ }, "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IGJsb2NrIg==" }, - "requestId": "9e944268-f10e-443f-adf4-8e4cb61b04c8" + "requestId": "c1b35560-781c-4cf7-9c05-429c5153b43b" } }, { "eventId": "34", - "eventTime": "2024-11-08T21:52:52.796218Z", + "eventTime": "2025-01-13T18:44:53.892504260Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", - "taskId": "1049609", + "taskId": "1051946", "workflowTaskScheduledEventAttributes": { "taskQueue": { - "name": "12998@Quinn-Klassens-MacBook-Pro.local:fa5cdd42-a3a4-4432-a16d-2f542f40c458", + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", "kind": "TASK_QUEUE_KIND_STICKY", - "normalName": "WorkflowTest-testWorkflowOperation-61235064-dfff-4a79-8d6b-a540776c8a99" + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" }, "startToCloseTimeout": "5s", "attempt": 1 @@ -545,34 +545,204 @@ }, { "eventId": "35", - "eventTime": "2024-11-08T21:52:52.796781Z", + "eventTime": "2025-01-13T18:44:53.894784260Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", - "taskId": "1049613", + "taskId": "1051950", "workflowTaskStartedEventAttributes": { "scheduledEventId": "34", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", - "requestId": "85112809-5afc-45de-83fb-fb9c07a9be80", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "bcbb76e0-fcd2-4984-abbe-e152e40dcb44", "historySizeBytes": "5507" } }, { "eventId": "36", - "eventTime": "2024-11-08T21:52:52.799019Z", + "eventTime": "2025-01-13T18:44:53.900269469Z", "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", - "taskId": "1049617", + "taskId": "1051954", "workflowTaskCompletedEventAttributes": { "scheduledEventId": "34", "startedEventId": "35", - "identity": "12998@Quinn-Klassens-MacBook-Pro.local", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", "workerVersion": {}, "meteringMetadata": {} } }, { "eventId": "37", - "eventTime": "2024-11-08T21:52:52.799037Z", + "eventTime": "2025-01-13T18:44:53.900292177Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_SCHEDULED", + "taskId": "1051955", + "nexusOperationScheduledEventAttributes": { + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f", + "service": "TestNexusService1", + "operation": "operation", + "input": { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImZhaWwi" + }, + "scheduleToCloseTimeout": "200s", + "workflowTaskCompletedEventId": "36", + "requestId": "f4603eaa-3f02-47e4-94a8-d523ba2c8c9a", + "endpointId": "160b86b3-173f-4a31-a3ba-3f85f2f60e49" + } + }, + { + "eventId": "38", + "eventTime": "2025-01-13T18:44:53.912549385Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_STARTED", + "taskId": "1051958", + "links": [ + { + "workflowEvent": { + "namespace": "UnitTest", + "workflowId": "test-prefixf4603eaa-3f02-47e4-94a8-d523ba2c8c9a", + "runId": "0ef8eea4-8d41-4c15-b1fa-a06740fb3187", + "eventRef": { + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED" + } + } + } + ], + "nexusOperationStartedEventAttributes": { + "scheduledEventId": "37", + "operationId": "test-prefixf4603eaa-3f02-47e4-94a8-d523ba2c8c9a", + "requestId": "f4603eaa-3f02-47e4-94a8-d523ba2c8c9a" + } + }, + { + "eventId": "39", + "eventTime": "2025-01-13T18:44:53.912571177Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1051959", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "40", + "eventTime": "2025-01-13T18:44:53.915198260Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1051963", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "39", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "7afe5f71-4928-450e-a6e6-5efd802bae74", + "historySizeBytes": "6366" + } + }, + { + "eventId": "41", + "eventTime": "2025-01-13T18:44:53.920337219Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1051967", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "39", + "startedEventId": "40", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "42", + "eventTime": "2025-01-13T18:44:53.933095135Z", + "eventType": "EVENT_TYPE_NEXUS_OPERATION_FAILED", + "taskId": "1051969", + "nexusOperationFailedEventAttributes": { + "scheduledEventId": "37", + "failure": { + "message": "nexus operation completed unsuccessfully", + "cause": { + "message": "simulated failure", + "source": "JavaSDK", + "stackTrace": "io.temporal.failure.ApplicationFailure.newFailureWithCause(ApplicationFailure.java:95)\nio.temporal.workflow.nexus.AsyncWorkflowOperationTest$TestOperationWorkflow.execute(AsyncWorkflowOperationTest.java:138)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:566)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:381)\nio.temporal.common.interceptors.WorkflowInboundCallsInterceptorBase.execute(WorkflowInboundCallsInterceptorBase.java:40)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:353)\n", + "cause": { + "message": "simulated cause", + "source": "JavaSDK", + "stackTrace": "io.temporal.failure.ApplicationFailure.newFailureWithCause(ApplicationFailure.java:95)\nio.temporal.failure.ApplicationFailure.newFailure(ApplicationFailure.java:75)\nio.temporal.workflow.nexus.AsyncWorkflowOperationTest$TestOperationWorkflow.execute(AsyncWorkflowOperationTest.java:141)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\njava.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\njava.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.base/java.lang.reflect.Method.invoke(Method.java:566)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation$RootWorkflowInboundCallsInterceptor.execute(POJOWorkflowImplementationFactory.java:381)\nio.temporal.common.interceptors.WorkflowInboundCallsInterceptorBase.execute(WorkflowInboundCallsInterceptorBase.java:40)\nio.temporal.internal.sync.POJOWorkflowImplementationFactory$POJOWorkflowImplementation.execute(POJOWorkflowImplementationFactory.java:353)\n", + "applicationFailureInfo": { + "type": "SimulatedCause" + } + }, + "applicationFailureInfo": { + "type": "SimulatedFailureType", + "details": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==" + }, + "data": "ImZvbyI=" + } + ] + } + } + }, + "nexusOperationExecutionFailureInfo": { + "scheduledEventId": "37", + "endpoint": "test-endpoint-WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f", + "service": "TestNexusService1", + "operation": "operation", + "operationId": "test-prefixf4603eaa-3f02-47e4-94a8-d523ba2c8c9a" + } + }, + "requestId": "f4603eaa-3f02-47e4-94a8-d523ba2c8c9a" + } + }, + { + "eventId": "43", + "eventTime": "2025-01-13T18:44:53.933113885Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1051970", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "66541@Quinn-Klassens-MacBook-Pro.local:cf714507-4b77-420f-9ac6-e76b4767ea8b", + "kind": "TASK_QUEUE_KIND_STICKY", + "normalName": "WorkflowTest-testWorkflowOperation-03d3be5a-7dc5-4d5e-ba6a-dace16533a6f" + }, + "startToCloseTimeout": "5s", + "attempt": 1 + } + }, + { + "eventId": "44", + "eventTime": "2025-01-13T18:44:53.935680260Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1051974", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "43", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "requestId": "758ed8a2-4fa2-46aa-93e9-8ec84bb72662", + "historySizeBytes": "9210" + } + }, + { + "eventId": "45", + "eventTime": "2025-01-13T18:44:53.943236844Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1051978", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "43", + "startedEventId": "44", + "identity": "66541@Quinn-Klassens-MacBook-Pro.local", + "workerVersion": {}, + "meteringMetadata": {} + } + }, + { + "eventId": "46", + "eventTime": "2025-01-13T18:44:53.943254927Z", "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", - "taskId": "1049618", + "taskId": "1051979", "workflowExecutionCompletedEventAttributes": { "result": { "payloads": [ @@ -580,11 +750,11 @@ "metadata": { "encoding": "anNvbi9wbGFpbg==" }, - "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tNjEyMzUwNjQtZGZmZi00YTc5LThkNmItYTU0MDc3NmM4YTk5Ig==" + "data": "IkhlbGxvIGZyb20gb3BlcmF0aW9uIHdvcmtmbG93IFdvcmtmbG93VGVzdC10ZXN0V29ya2Zsb3dPcGVyYXRpb24tMDNkM2JlNWEtN2RjNS00ZDVlLWJhNmEtZGFjZTE2NTMzYTZmIg==" } ] }, - "workflowTaskCompletedEventId": "36" + "workflowTaskCompletedEventId": "45" } } ] diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java index ab3d6b9674..647bfbbc5a 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -51,14 +51,13 @@ import com.google.protobuf.util.Timestamps; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.nexusrpc.handler.HandlerException; import io.temporal.api.command.v1.*; import io.temporal.api.common.v1.*; import io.temporal.api.enums.v1.*; import io.temporal.api.errordetails.v1.QueryFailedFailure; -import io.temporal.api.failure.v1.ApplicationFailureInfo; +import io.temporal.api.failure.v1.*; import io.temporal.api.failure.v1.Failure; -import io.temporal.api.failure.v1.NexusOperationFailureInfo; -import io.temporal.api.failure.v1.TimeoutFailureInfo; import io.temporal.api.history.v1.*; import io.temporal.api.nexus.v1.*; import io.temporal.api.nexus.v1.Link; @@ -753,6 +752,7 @@ private static void startNexusOperation( .setNexusOperationStartedEventAttributes( NexusOperationStartedEventAttributes.newBuilder() .setOperationId(resp.getOperationId()) + .setOperationToken(resp.getOperationToken()) .setScheduledEventId(data.scheduledEventId) .setRequestId(data.scheduledEvent.getRequestId())); @@ -818,6 +818,31 @@ private static void timeoutNexusOperation( private static State failNexusOperation( RequestContext ctx, NexusOperationData data, Failure failure, long notUsed) { + // Nexus operation failures are never retryable + if (failure.hasNexusOperationExecutionFailureInfo()) { + // Populate the failure with the operation details + ctx.addEvent( + HistoryEvent.newBuilder() + .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_FAILED) + .setNexusOperationFailedEventAttributes( + NexusOperationFailedEventAttributes.newBuilder() + .setRequestId(data.scheduledEvent.getRequestId()) + .setScheduledEventId(data.scheduledEventId) + .setFailure( + failure.toBuilder() + .setNexusOperationExecutionFailureInfo( + failure.getNexusOperationExecutionFailureInfo().toBuilder() + .setEndpoint(data.scheduledEvent.getEndpoint()) + .setService(data.scheduledEvent.getService()) + .setOperation(data.scheduledEvent.getOperation()) + .setOperationId(data.operationId) + .setScheduledEventId(data.scheduledEventId) + .build()) + .build())) + .build()); + return FAILED; + } + RetryState retryState = attemptNexusOperationRetry(ctx, Optional.of(failure), data); if (retryState == RetryState.RETRY_STATE_IN_PROGRESS || retryState == RetryState.RETRY_STATE_TIMEOUT) { @@ -867,6 +892,26 @@ private static RetryState attemptNexusOperationRetry( } } + if (failure.get().hasNexusHandlerFailureInfo()) { + NexusHandlerFailureInfo handlerFailure = failure.get().getNexusHandlerFailureInfo(); + HandlerException.RetryBehavior retryBehavior = HandlerException.RetryBehavior.UNSPECIFIED; + switch (handlerFailure.getRetryBehavior()) { + case NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_NON_RETRYABLE: + retryBehavior = HandlerException.RetryBehavior.NON_RETRYABLE; + break; + case NEXUS_HANDLER_ERROR_RETRY_BEHAVIOR_RETRYABLE: + retryBehavior = HandlerException.RetryBehavior.RETRYABLE; + break; + } + // Deserialize the HandlerFailure to a HandlerException to check if it is retryable, we do not + // need to convert + // the whole error chain, so we don't pass cause. + HandlerException he = new HandlerException(handlerFailure.getType(), null, retryBehavior); + if (!he.isRetryable()) { + return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE; + } + } + TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure); TestServiceRetryState.BackoffInterval backoffInterval = data.retryState.getBackoffIntervalInSeconds( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index 832bdbc3dc..d3c61408e7 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -115,6 +115,8 @@ void startNexusOperation( void cancelNexusOperation(NexusOperationRef ref, Failure failure); + void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref); + void completeNexusOperation(NexusOperationRef ref, Payload result); void completeAsyncNexusOperation( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 4896f53299..e4bf7b3e67 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -2245,6 +2245,19 @@ public void cancelNexusOperation(NexusOperationRef ref, Failure failure) { }); } + @Override + public void cancelNexusOperationRequestAcknowledge(NexusOperationRef ref) { + update( + ctx -> { + StateMachine operation = + getPendingNexusOperation(ref.getScheduledEventId()); + if (!operationInFlight(operation.getState())) { + return; + } + ctx.unlockTimer("cancelNexusOperationRequestAcknowledge"); + }); + } + @Override public void completeNexusOperation(NexusOperationRef ref, Payload result) { update( diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 48cb601f7e..f2ae272ca0 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -24,12 +24,12 @@ import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.START_WORKFLOW; import static io.temporal.api.workflowservice.v1.ExecuteMultiOperationRequest.Operation.OperationCase.UPDATE_WORKFLOW; import static io.temporal.internal.testservice.CronUtils.getBackoffInterval; +import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.protobuf.ByteString; -import com.google.protobuf.Empty; -import com.google.protobuf.Timestamp; +import com.google.protobuf.*; +import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.Timestamps; import io.grpc.*; import io.grpc.stub.StreamObserver; @@ -43,10 +43,8 @@ import io.temporal.api.enums.v1.*; import io.temporal.api.errordetails.v1.MultiOperationExecutionFailure; import io.temporal.api.errordetails.v1.WorkflowExecutionAlreadyStartedFailure; -import io.temporal.api.failure.v1.ApplicationFailureInfo; -import io.temporal.api.failure.v1.CanceledFailureInfo; +import io.temporal.api.failure.v1.*; import io.temporal.api.failure.v1.Failure; -import io.temporal.api.failure.v1.MultiOperationExecutionAborted; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes; import io.temporal.api.namespace.v1.NamespaceInfo; @@ -89,6 +87,11 @@ public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServiceImplBase implements Closeable { private static final Logger log = LoggerFactory.getLogger(TestWorkflowService.class); + private static final JsonFormat.Printer JSON_PRINTER = JsonFormat.printer(); + private static final JsonFormat.Parser JSON_PARSER = JsonFormat.parser(); + + private static final String FAILURE_TYPE_STRING = Failure.getDescriptor().getFullName(); + private final Map executions = new HashMap<>(); // key->WorkflowId private final Map executionsByWorkflowId = new HashMap<>(); @@ -806,6 +809,14 @@ public void pollNexusTaskQueue( } } + private static Failure wrapNexusOperationFailure(Failure cause) { + return Failure.newBuilder() + .setMessage("nexus operation completed unsuccessfully") + .setNexusOperationExecutionFailureInfo(NexusOperationFailureInfo.newBuilder().build()) + .setCause(cause) + .build(); + } + @Override public void respondNexusTaskCompleted( RespondNexusTaskCompletedRequest request, @@ -826,7 +837,7 @@ public void respondNexusTaskCompleted( .setMessage("operation canceled") .setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance()) .build(); - mutableState.cancelNexusOperation(tt.getOperationRef(), canceled); + mutableState.cancelNexusOperationRequestAcknowledge(tt.getOperationRef()); } else if (request.getResponse().hasStartOperation()) { StartOperationResponse startResp = request.getResponse().getStartOperation(); if (startResp.hasOperationError()) { @@ -839,12 +850,9 @@ public void respondNexusTaskCompleted( .setDetails(nexusFailureMetadataToPayloads(opError.getFailure()))); mutableState.cancelNexusOperation(tt.getOperationRef(), b.build()); } else { - b.setApplicationFailureInfo( - ApplicationFailureInfo.newBuilder() - .setType("NexusOperationFailure") - .setDetails(nexusFailureMetadataToPayloads(opError.getFailure())) - .setNonRetryable(true)); - mutableState.failNexusOperation(tt.getOperationRef(), b.build()); + mutableState.failNexusOperation( + tt.getOperationRef(), + wrapNexusOperationFailure(nexusFailureToAPIFailure(opError.getFailure(), false))); } } else if (startResp.hasAsyncSuccess()) { // Start event is only recorded for async success @@ -913,41 +921,40 @@ public void completeNexusOperation( target.completeAsyncNexusOperation(ref, p, operationID, startLink); break; case EVENT_TYPE_WORKFLOW_EXECUTION_FAILED: - Failure f = - Failure.newBuilder() - .setMessage( - completionEvent - .getWorkflowExecutionFailedEventAttributes() - .getFailure() - .getMessage()) - .build(); - target.failNexusOperation(ref, f); + Failure wfFailure = + completionEvent.getWorkflowExecutionFailedEventAttributes().getFailure(); + target.failNexusOperation(ref, wrapNexusOperationFailure(wfFailure)); break; case EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED: + CanceledFailureInfo.Builder cancelFailure = CanceledFailureInfo.newBuilder(); + if (completionEvent.getWorkflowExecutionCanceledEventAttributes().hasDetails()) { + cancelFailure.setDetails( + completionEvent.getWorkflowExecutionCanceledEventAttributes().getDetails()); + } Failure canceled = Failure.newBuilder() .setMessage("operation canceled") - .setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance()) + .setCanceledFailureInfo(cancelFailure.build()) .build(); target.cancelNexusOperation(ref, canceled); break; case EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED: - Failure terminated = - Failure.newBuilder() - .setMessage("operation terminated") - .setApplicationFailureInfo( - ApplicationFailureInfo.newBuilder().setNonRetryable(true)) - .build(); - target.failNexusOperation(ref, terminated); + target.failNexusOperation( + ref, + wrapNexusOperationFailure( + Failure.newBuilder() + .setMessage("operation terminated") + .setTerminatedFailureInfo(TerminatedFailureInfo.getDefaultInstance()) + .build())); break; case EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT: - Failure timedOut = - Failure.newBuilder() - .setMessage("operation exceeded internal timeout") - .setApplicationFailureInfo( - ApplicationFailureInfo.newBuilder().setNonRetryable(true)) - .build(); - target.failNexusOperation(ref, timedOut); + target.failNexusOperation( + ref, + wrapNexusOperationFailure( + Failure.newBuilder() + .setMessage("operation exceeded internal timeout") + .setTimeoutFailureInfo(TimeoutFailureInfo.newBuilder().build()) + .build())); break; default: throw Status.INTERNAL @@ -959,13 +966,39 @@ public void completeNexusOperation( private static Failure handlerErrorToFailure(HandlerError err) { return Failure.newBuilder() .setMessage(err.getFailure().getMessage()) - .setApplicationFailureInfo( - ApplicationFailureInfo.newBuilder() - .setType(err.getErrorType()) - .setDetails(nexusFailureMetadataToPayloads(err.getFailure()))) + .setNexusHandlerFailureInfo( + NexusHandlerFailureInfo.newBuilder().setType(err.getErrorType()).build()) + .setCause(nexusFailureToAPIFailure(err.getFailure(), false)) .build(); } + /** + * nexusFailureToAPIFailure converts a Nexus Failure to an API proto Failure. If the failure + * metadata "type" field is set to the fullname of the temporal API Failure message, the failure + * is reconstructed using protojson.Unmarshal on the failure details field. + */ + private static Failure nexusFailureToAPIFailure( + io.temporal.api.nexus.v1.Failure failure, boolean retryable) { + Failure.Builder apiFailure = Failure.newBuilder(); + if (failure.getMetadataMap().containsKey("type") + && failure.getMetadataMap().get("type").equals(FAILURE_TYPE_STRING)) { + try { + JSON_PARSER.merge(failure.getDetails().toString(UTF_8), apiFailure); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } else { + Payloads payloads = nexusFailureMetadataToPayloads(failure); + ApplicationFailureInfo.Builder applicationFailureInfo = ApplicationFailureInfo.newBuilder(); + applicationFailureInfo.setType("NexusFailure"); + applicationFailureInfo.setDetails(payloads); + applicationFailureInfo.setNonRetryable(!retryable); + apiFailure.setApplicationFailureInfo(applicationFailureInfo.build()); + } + apiFailure.setMessage(failure.getMessage()); + return apiFailure.build(); + } + private static Payloads nexusFailureMetadataToPayloads(io.temporal.api.nexus.v1.Failure failure) { Map metadata = failure.getMetadataMap().entrySet().stream() diff --git a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java index 081e6752bd..5da6517bb8 100644 --- a/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java +++ b/temporal-test-server/src/test/java/io/temporal/testserver/functional/NexusWorkflowTest.java @@ -20,7 +20,7 @@ package io.temporal.testserver.functional; -import static org.junit.Assume.assumeFalse; +import static org.junit.Assume.assumeTrue; import com.google.protobuf.ByteString; import com.google.protobuf.util.Durations; @@ -34,6 +34,7 @@ import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.nexus.v1.*; import io.temporal.api.operatorservice.v1.CreateNexusEndpointRequest; +import io.temporal.api.operatorservice.v1.DeleteNexusEndpointRequest; import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.workflowservice.v1.*; import io.temporal.client.WorkflowOptions; @@ -47,10 +48,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; public class NexusWorkflowTest { @Rule @@ -68,13 +66,12 @@ public class NexusWorkflowTest { @Before public void setup() { - // TODO: remove this skip once 1.25.0 is officially released and - // https://github.com/temporalio/sdk-java/issues/2165 is resolved - assumeFalse( - "Nexus APIs are not supported for server versions < 1.25.0", - testWorkflowRule.isUseExternalService()); + testEndpoint = createEndpoint("nexus-workflow-test-endpoint-" + UUID.randomUUID()); + } - testEndpoint = createEndpoint("nexus-workflow-test-endpoint"); + @After + public void tearDown() { + deleteEndpoint(testEndpoint); } @Test @@ -453,8 +450,7 @@ public void testNexusOperationAsyncHandlerTerminated() { Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); io.temporal.api.failure.v1.Failure cause = failure.getCause(); Assert.assertEquals("operation terminated", cause.getMessage()); - Assert.assertTrue(cause.hasApplicationFailureInfo()); - Assert.assertTrue(cause.getApplicationFailureInfo().getNonRetryable()); + Assert.assertTrue(cause.hasTerminatedFailureInfo()); } catch (Exception e) { Assert.fail(e.getMessage()); } finally { @@ -531,84 +527,7 @@ public void testNexusOperationAsyncHandlerTimeout() { Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); io.temporal.api.failure.v1.Failure cause = failure.getCause(); Assert.assertEquals("operation exceeded internal timeout", cause.getMessage()); - Assert.assertTrue(cause.hasApplicationFailureInfo()); - Assert.assertTrue(cause.getApplicationFailureInfo().getNonRetryable()); - } catch (Exception e) { - Assert.fail(e.getMessage()); - } finally { - nexusPoller.cancel(true); - } - } - - @Test - public void testNexusOperationCancellation() { - String operationId = UUID.randomUUID().toString(); - CompletableFuture nexusPoller = - pollNexusTask().thenCompose(task -> completeNexusTask(task, operationId)); - - try { - WorkflowStub stub = newWorkflowStub("TestNexusOperationCancellationWorkflow"); - WorkflowExecution execution = stub.start(); - - // Get first WFT and respond with ScheduleNexusOperation command - PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); - completeWorkflowTask( - pollResp.getTaskToken(), - newScheduleOperationCommand( - defaultScheduleOperationAttributes() - .setScheduleToCloseTimeout(Durations.fromSeconds(5)))); - testWorkflowRule.assertHistoryEvent( - execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); - - // Wait for operation to be started - nexusPoller.get(); - - // Poll and verify started event is recorded and triggers workflow progress - pollResp = pollWorkflowTask(); - testWorkflowRule.assertHistoryEvent( - execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED); - List events = - testWorkflowRule.getHistoryEvents( - execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); - Assert.assertEquals(1, events.size()); - - // Cancel operation - HistoryEvent scheduledEvent = events.get(0); - Command cancelCmd = - Command.newBuilder() - .setCommandType(CommandType.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION) - .setRequestCancelNexusOperationCommandAttributes( - RequestCancelNexusOperationCommandAttributes.newBuilder() - .setScheduledEventId(scheduledEvent.getEventId())) - .build(); - completeWorkflowTask(pollResp.getTaskToken(), cancelCmd); - - // Poll for and complete cancellation task - pollNexusTask() - .thenCompose( - task -> - completeNexusTask( - task, - Response.newBuilder() - .setCancelOperation(CancelOperationResponse.getDefaultInstance()) - .build())) - .get(); - - // Poll to verify cancellation is recorded and triggers workflow progress. - pollResp = pollWorkflowTask(); - completeWorkflow(pollResp.getTaskToken()); - - events = - testWorkflowRule.getHistoryEvents( - execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED); - Assert.assertEquals(1, events.size()); - io.temporal.api.failure.v1.Failure failure = - events.get(0).getNexusOperationCanceledEventAttributes().getFailure(); - assertOperationFailureInfo(operationId, failure.getNexusOperationExecutionFailureInfo()); - Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); - io.temporal.api.failure.v1.Failure cause = failure.getCause(); - Assert.assertEquals("operation canceled", cause.getMessage()); - Assert.assertTrue(cause.hasCanceledFailureInfo()); + Assert.assertTrue(cause.hasTimeoutFailureInfo()); } catch (Exception e) { Assert.fail(e.getMessage()); } finally { @@ -616,50 +535,6 @@ public void testNexusOperationCancellation() { } } - @Test - public void testNexusOperationCancelBeforeStart() { - WorkflowStub stub = newWorkflowStub("TestNexusOperationCancelBeforeStartWorkflow"); - WorkflowExecution execution = stub.start(); - - // Get first WFT and respond with ScheduleNexusOperation command - PollWorkflowTaskQueueResponse pollResp = pollWorkflowTask(); - completeWorkflowTask(pollResp.getTaskToken(), true, newScheduleOperationCommand()); - - // Poll for new WFT and respond with RequestCancelNexusOperation command - pollResp = pollWorkflowTask(); - - List events = - testWorkflowRule.getHistoryEvents( - execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED); - Assert.assertEquals(1, events.size()); - - HistoryEvent scheduledEvent = events.get(0); - Command cancelCmd = - Command.newBuilder() - .setCommandType(CommandType.COMMAND_TYPE_REQUEST_CANCEL_NEXUS_OPERATION) - .setRequestCancelNexusOperationCommandAttributes( - RequestCancelNexusOperationCommandAttributes.newBuilder() - .setScheduledEventId(scheduledEvent.getEventId())) - .build(); - completeWorkflowTask(pollResp.getTaskToken(), cancelCmd); - - // Poll and verify cancel triggers workflow progress - pollResp = pollWorkflowTask(); - completeWorkflow(pollResp.getTaskToken()); - - events = - testWorkflowRule.getHistoryEvents( - execution.getWorkflowId(), EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED); - Assert.assertEquals(1, events.size()); - io.temporal.api.failure.v1.Failure failure = - events.get(0).getNexusOperationCanceledEventAttributes().getFailure(); - assertOperationFailureInfo(failure.getNexusOperationExecutionFailureInfo()); - Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); - io.temporal.api.failure.v1.Failure cause = failure.getCause(); - Assert.assertEquals("operation canceled before it was started", cause.getMessage()); - Assert.assertTrue(cause.hasCanceledFailureInfo()); - } - @Test(timeout = 15000) public void testNexusOperationTimeout_BeforeStart() { WorkflowStub stub = newWorkflowStub("TestNexusOperationTimeoutBeforeStartWorkflow"); @@ -681,13 +556,12 @@ public void testNexusOperationTimeout_BeforeStart() { Assert.assertTrue(nexusPollResp.getRequest().hasStartOperation()); // Request timeout and long poll deadline are both 10s, so sleep to give some buffer so poll - // request doesn't timeout. + // request doesn't time out. Thread.sleep(2000); // Poll again to verify task is resent on timeout - nexusPollResp = pollNexusTask().get(); - NexusTaskToken ref = NexusTaskToken.fromBytes(nexusPollResp.getTaskToken()); - Assert.assertEquals(2, ref.getAttempt()); + PollNexusTaskQueueResponse nextNexusPollResp = pollNexusTask().get(); + Assert.assertTrue(!nexusPollResp.getTaskToken().equals(nextNexusPollResp.getTaskToken())); } catch (Exception e) { Assert.fail(e.getMessage()); } @@ -814,11 +688,9 @@ public void testNexusOperationTimeout_AfterCancel() { // request doesn't timeout. Thread.sleep(2000); - // Poll for cancellation task again to confirm it is retried on timeout + // Poll for cancellation task again nexusPollResp = pollNexusTask().get(); Assert.assertTrue(nexusPollResp.getRequest().hasCancelOperation()); - NexusTaskToken ref = NexusTaskToken.fromBytes(nexusPollResp.getTaskToken()); - Assert.assertTrue(ref.getAttempt() > 1); // Request timeout and long poll deadline are both 10s, so sleep to give some buffer so poll // request doesn't timeout. @@ -890,7 +762,7 @@ public void testNexusOperationError() { io.temporal.api.failure.v1.Failure cause = failure.getCause(); Assert.assertEquals("deliberate test failure", cause.getMessage()); Assert.assertTrue(cause.hasApplicationFailureInfo()); - Assert.assertEquals("NexusOperationFailure", cause.getApplicationFailureInfo().getType()); + Assert.assertEquals("NexusFailure", cause.getApplicationFailureInfo().getType()); } catch (Exception e) { Assert.fail(e.getMessage()); } finally { @@ -921,7 +793,7 @@ public void testNexusOperationHandlerError() { failNexusTask( task.getTaskToken(), HandlerError.newBuilder() - .setErrorType("INVALID_ARGUMENT") + .setErrorType("BAD_REQUEST") .setFailure( Failure.newBuilder() .setMessage("deliberate terminal error")) @@ -951,9 +823,9 @@ public void testNexusOperationHandlerError() { assertOperationFailureInfo(failure.getNexusOperationExecutionFailureInfo()); Assert.assertEquals("nexus operation completed unsuccessfully", failure.getMessage()); io.temporal.api.failure.v1.Failure cause = failure.getCause(); - Assert.assertEquals("deliberate terminal error", cause.getMessage()); - Assert.assertTrue(cause.hasApplicationFailureInfo()); - Assert.assertEquals("INVALID_ARGUMENT", cause.getApplicationFailureInfo().getType()); + Assert.assertTrue(cause.getMessage().endsWith("deliberate terminal error")); + Assert.assertTrue(cause.hasNexusHandlerFailureInfo()); + Assert.assertEquals("BAD_REQUEST", cause.getNexusHandlerFailureInfo().getType()); } catch (Exception e) { Assert.fail(e.getMessage()); } finally { @@ -963,6 +835,9 @@ public void testNexusOperationHandlerError() { @Test public void testNexusOperationInvalidRef() { + assumeTrue( + "Skipping for real server since this test is parsing and modifying the operation token", + !testWorkflowRule.isUseExternalService()); // Polls for nexus task -> respond with invalid task token -> respond with correct task token CompletableFuture nexusPoller = pollNexusTask() @@ -1218,6 +1093,18 @@ private Endpoint createEndpoint(String name) { .getEndpoint(); } + private void deleteEndpoint(Endpoint endpoint) { + testWorkflowRule + .getTestEnvironment() + .getOperatorServiceStubs() + .blockingStub() + .deleteNexusEndpoint( + DeleteNexusEndpointRequest.newBuilder() + .setId(endpoint.getId()) + .setVersion(endpoint.getVersion()) + .build()); + } + public static class EchoNexusHandlerWorkflowImpl implements TestWorkflows.PrimitiveNexusHandlerWorkflow { @Override diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 93eb971dd3..646891fd3f 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue; import com.uber.m3.tally.Scope; -import io.nexusrpc.OperationUnsuccessfulException; +import io.nexusrpc.OperationException; import io.nexusrpc.handler.OperationContext; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.ActivityCompletionException; @@ -495,7 +495,7 @@ public void init(NexusOperationOutboundCallsInterceptor outboundCalls) { @Override public StartOperationOutput startOperation(StartOperationInput input) - throws OperationUnsuccessfulException { + throws OperationException { trace.add( "startNexusOperation " + input.getOperationContext().getService()