diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java index de5de97b7526d..95f54b69de1f1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java @@ -298,6 +298,16 @@ CompletableFuture getFinishApplicationFuture() { return finishApplicationFuture; } + @VisibleForTesting + public Configuration getConfiguration() { + return configuration; + } + + @VisibleForTesting + public PackagedProgram getPackagedProgram() { + return program; + } + private CompletableFuture onApplicationCanceledOrFailed( final DispatcherGateway dispatcherGateway, final ScheduledExecutor scheduledExecutor, diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java index 93ff7baae9878..188ee9ece3ccb 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java @@ -32,6 +32,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JarPlanGetHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler; import org.apache.flink.runtime.webmonitor.handlers.JarPlanPostHeaders; +import org.apache.flink.runtime.webmonitor.handlers.JarRunApplicationHandler; +import org.apache.flink.runtime.webmonitor.handlers.JarRunApplicationHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler; import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; @@ -60,6 +62,7 @@ public class WebSubmissionExtension implements WebMonitorExtension { // for easier access during testing private final JarUploadHandler jarUploadHandler; private final JarRunHandler jarRunHandler; + private final JarRunApplicationHandler jarRunApplicationHandler; public WebSubmissionExtension( Configuration configuration, @@ -132,6 +135,15 @@ public WebSubmissionExtension( jarRunExecutor, applicationRunnerSupplier); + jarRunApplicationHandler = + new JarRunApplicationHandler( + leaderRetriever, + timeout, + responseHeaders, + JarRunApplicationHeaders.getInstance(), + jarDir, + configuration); + final JarDeleteHandler jarDeleteHandler = new JarDeleteHandler( leaderRetriever, @@ -164,6 +176,8 @@ public WebSubmissionExtension( webSubmissionHandlers.add(Tuple2.of(JarUploadHeaders.getInstance(), jarUploadHandler)); webSubmissionHandlers.add(Tuple2.of(JarListHeaders.getInstance(), jarListHandler)); webSubmissionHandlers.add(Tuple2.of(JarRunHeaders.getInstance(), jarRunHandler)); + webSubmissionHandlers.add( + Tuple2.of(JarRunApplicationHeaders.getInstance(), jarRunApplicationHandler)); webSubmissionHandlers.add(Tuple2.of(JarDeleteHeaders.getInstance(), jarDeleteHandler)); webSubmissionHandlers.add(Tuple2.of(JarPlanGetHeaders.getInstance(), jarPlanHandler)); webSubmissionHandlers.add(Tuple2.of(JarPlanPostHeaders.getInstance(), postJarPlanHandler)); @@ -188,4 +202,9 @@ JarUploadHandler getJarUploadHandler() { JarRunHandler getJarRunHandler() { return jarRunHandler; } + + @VisibleForTesting + JarRunApplicationHandler getJarRunApplicationHandler() { + return jarRunApplicationHandler; + } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java new file mode 100644 index 0000000000000..ec8389f4e7174 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.client.deployment.application.PackagedProgramApplication; +import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor; +import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.core.execution.RecoveryClaimMode; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.nio.file.Path; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava33.com.google.common.base.Strings.emptyToNull; + +/** Handler to submit applications uploaded via the Web UI. */ +public class JarRunApplicationHandler + extends AbstractRestHandler< + DispatcherGateway, + JarRunApplicationRequestBody, + JarRunApplicationResponseBody, + JarRunApplicationMessageParameters> { + + private final Path jarDir; + + private final Configuration configuration; + + public JarRunApplicationHandler( + final GatewayRetriever leaderRetriever, + final Duration timeout, + final Map responseHeaders, + final MessageHeaders< + JarRunApplicationRequestBody, + JarRunApplicationResponseBody, + JarRunApplicationMessageParameters> + messageHeaders, + final Path jarDir, + final Configuration configuration) { + super(leaderRetriever, timeout, responseHeaders, messageHeaders); + + this.jarDir = requireNonNull(jarDir); + this.configuration = requireNonNull(configuration); + } + + @Override + @VisibleForTesting + public CompletableFuture handleRequest( + @Nonnull final HandlerRequest request, + @Nonnull final DispatcherGateway gateway) + throws RestHandlerException { + + final Configuration effectiveConfiguration = new Configuration(configuration); + effectiveConfiguration.set(DeploymentOptions.TARGET, EmbeddedExecutor.NAME); + + final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log); + context.applyToConfiguration(effectiveConfiguration, request); + SavepointRestoreSettings.toConfiguration( + getSavepointRestoreSettings(request, effectiveConfiguration), + effectiveConfiguration); + + final PackagedProgram program = context.toPackagedProgram(effectiveConfiguration); + + ApplicationID applicationId = context.getApplicationId().orElse(ApplicationID.generate()); + PackagedProgramApplication application = + new PackagedProgramApplication( + applicationId, + program, + Collections.emptyList(), + effectiveConfiguration, + false, + true, + false, + false); + + return gateway.submitApplication(application, timeout) + .handle( + (acknowledge, throwable) -> { + if (throwable != null) { + throw new CompletionException( + new RestHandlerException( + "Could not submit application.", + HttpResponseStatus.BAD_REQUEST, + throwable)); + } + return new JarRunApplicationResponseBody(applicationId); + }); + } + + private SavepointRestoreSettings getSavepointRestoreSettings( + final @Nonnull HandlerRequest request, + final Configuration effectiveConfiguration) + throws RestHandlerException { + + final JarRunApplicationRequestBody requestBody = request.getRequestBody(); + + final boolean allowNonRestoredState = + fromRequestBodyOrQueryParameter( + requestBody.getAllowNonRestoredState().orElse(null), + () -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class), + effectiveConfiguration.get( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE), + log); + final String savepointPath = + fromRequestBodyOrQueryParameter( + emptyToNull(requestBody.getSavepointPath().orElse(null)), + () -> + emptyToNull( + getQueryParameter( + request, SavepointPathQueryParameter.class)), + effectiveConfiguration.get(StateRecoveryOptions.SAVEPOINT_PATH), + log); + final RecoveryClaimMode recoveryClaimMode = + requestBody + .getRecoveryClaimMode() + .orElse(effectiveConfiguration.get(StateRecoveryOptions.RESTORE_MODE)); + if (recoveryClaimMode.equals(RecoveryClaimMode.LEGACY)) { + log.warn( + "The {} restore mode is deprecated, please use {} or {} mode instead.", + RecoveryClaimMode.LEGACY, + RecoveryClaimMode.CLAIM, + RecoveryClaimMode.NO_CLAIM); + } + final SavepointRestoreSettings savepointRestoreSettings; + if (savepointPath != null) { + savepointRestoreSettings = + SavepointRestoreSettings.forPath( + savepointPath, allowNonRestoredState, recoveryClaimMode); + } else { + savepointRestoreSettings = SavepointRestoreSettings.none(); + } + return savepointRestoreSettings; + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHeaders.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHeaders.java new file mode 100644 index 0000000000000..a8dbe70158286 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHeaders.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** {@link MessageHeaders} for {@link JarRunApplicationHandler}. */ +public class JarRunApplicationHeaders + implements RuntimeMessageHeaders< + JarRunApplicationRequestBody, + JarRunApplicationResponseBody, + JarRunApplicationMessageParameters> { + + private static final JarRunApplicationHeaders INSTANCE = new JarRunApplicationHeaders(); + + private static final String URL = "/jars/:" + JarIdPathParameter.KEY + "/run-application"; + + private JarRunApplicationHeaders() {} + + @Override + public Class getResponseClass() { + return JarRunApplicationResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.ACCEPTED; + } + + @Override + public Class getRequestClass() { + return JarRunApplicationRequestBody.class; + } + + @Override + public JarRunApplicationMessageParameters getUnresolvedMessageParameters() { + return new JarRunApplicationMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static JarRunApplicationHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Submits an application by running a jar previously uploaded via '" + + JarUploadHeaders.URL + + "'. "; + } + + @Override + public String operationId() { + return "submitApplicationFromJar"; + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationMessageParameters.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationMessageParameters.java new file mode 100644 index 0000000000000..235231da61d14 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationMessageParameters.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.MessageParameters; + +/** + * {@link MessageParameters} for {@link JarRunApplicationHandler}. + * + *

Currently identical to {@link JarRunMessageParameters}. + */ +public class JarRunApplicationMessageParameters extends JarRunMessageParameters {} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBody.java new file mode 100644 index 0000000000000..b7bb0f577c1f3 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBody.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.execution.RecoveryClaimMode; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * {@link RequestBody} for running a jar as an application. + * + *

Nearly identical to {@link JarRunRequestBody}, but includes additional fields specific to for + * application and omits deprecated fields. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class JarRunApplicationRequestBody extends JarRequestBody { + private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState"; + + private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath"; + + private static final String FIELD_NAME_SAVEPOINT_CLAIM_MODE = "claimMode"; + + private static final String FIELD_NAME_APPLICATION_ID = "applicationId"; + + @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) + @Nullable + private final Boolean allowNonRestoredState; + + @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) + @Nullable + private final String savepointPath; + + @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE) + @Nullable + private final RecoveryClaimMode recoveryClaimMode; + + @JsonProperty(FIELD_NAME_APPLICATION_ID) + @JsonDeserialize(using = ApplicationIDDeserializer.class) + @JsonSerialize(using = ApplicationIDSerializer.class) + @Nullable + private final ApplicationID applicationId; + + public JarRunApplicationRequestBody() { + this(null, null, null, null, null, null, null, null, null); + } + + @JsonCreator + public JarRunApplicationRequestBody( + @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName, + @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) + List programArgumentsList, + @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism, + @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId, + @Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) + Boolean allowNonRestoredState, + @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath, + @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE) + RecoveryClaimMode recoveryClaimMode, + @Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION) + Map flinkConfiguration, + @Nullable @JsonProperty(FIELD_NAME_APPLICATION_ID) ApplicationID applicationId) { + super(entryClassName, programArgumentsList, parallelism, jobId, flinkConfiguration); + this.allowNonRestoredState = allowNonRestoredState; + this.savepointPath = savepointPath; + this.recoveryClaimMode = recoveryClaimMode; + this.applicationId = applicationId; + } + + @JsonIgnore + public Optional getAllowNonRestoredState() { + return Optional.ofNullable(allowNonRestoredState); + } + + @JsonIgnore + public Optional getSavepointPath() { + return Optional.ofNullable(savepointPath); + } + + @JsonIgnore + public Optional getRecoveryClaimMode() { + return Optional.ofNullable(recoveryClaimMode); + } + + @JsonIgnore + public Optional getApplicationId() { + return Optional.ofNullable(applicationId); + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBody.java new file mode 100644 index 0000000000000..74394efe95417 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBody.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import static java.util.Objects.requireNonNull; + +/** Response for {@link JarRunApplicationHandler}. */ +public class JarRunApplicationResponseBody implements ResponseBody { + + @JsonProperty("applicationid") + @JsonDeserialize(using = ApplicationIDDeserializer.class) + @JsonSerialize(using = ApplicationIDSerializer.class) + private final ApplicationID applicationId; + + @JsonCreator + public JarRunApplicationResponseBody( + @JsonProperty("applicationid") @JsonDeserialize(using = ApplicationIDDeserializer.class) + final ApplicationID applicationId) { + this.applicationId = requireNonNull(applicationId); + } + + public ApplicationID getApplicationId() { + return applicationId; + } +} diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java index 5503593758ca6..7025d9102ee16 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java @@ -18,8 +18,11 @@ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer; +import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer; import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer; import org.apache.flink.runtime.rest.messages.json.JobIDSerializer; @@ -33,19 +36,31 @@ /** Response for {@link JarRunHandler}. */ public class JarRunResponseBody implements ResponseBody { + private static final String FIELD_NAME_APPLICATION_ID = "applicationId"; + @JsonProperty("jobid") @JsonDeserialize(using = JobIDDeserializer.class) @JsonSerialize(using = JobIDSerializer.class) private final JobID jobId; + @JsonProperty(FIELD_NAME_APPLICATION_ID) + @JsonDeserialize(using = ApplicationIDDeserializer.class) + @JsonSerialize(using = ApplicationIDSerializer.class) + private final ApplicationID applicationId; + @JsonCreator public JarRunResponseBody( @JsonProperty("jobid") @JsonDeserialize(using = JobIDDeserializer.class) final JobID jobId) { this.jobId = requireNonNull(jobId); + this.applicationId = ApplicationID.fromHexString(jobId.toHexString()); } public JobID getJobId() { return jobId; } + + public ApplicationID getApplicationId() { + return applicationId; + } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java index a48f230eac3be..cb8d21f22dfa0 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers.utils; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ApplicationID; import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.webmonitor.handlers.EntryClassQueryParameter; import org.apache.flink.runtime.webmonitor.handlers.JarIdPathParameter; import org.apache.flink.runtime.webmonitor.handlers.JarRequestBody; +import org.apache.flink.runtime.webmonitor.handlers.JarRunApplicationRequestBody; import org.apache.flink.runtime.webmonitor.handlers.ParallelismQueryParameter; import org.apache.flink.runtime.webmonitor.handlers.ProgramArgQueryParameter; @@ -52,6 +54,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -76,18 +79,21 @@ public static class JarHandlerContext { private final List programArgs; private final int parallelism; private final JobID jobId; + @Nullable final ApplicationID applicationId; private JarHandlerContext( Path jarFile, String entryClass, List programArgs, int parallelism, - JobID jobId) { + JobID jobId, + @Nullable ApplicationID applicationId) { this.jarFile = jarFile; this.entryClass = entryClass; this.programArgs = programArgs; this.parallelism = parallelism; this.jobId = jobId; + this.applicationId = applicationId; } public static JarHandlerContext fromRequest( @@ -128,7 +134,18 @@ public static JarHandlerContext fromRequest( null, // Delegate default job ID to actual JobGraph generation log); - return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism, jobId); + final ApplicationID applicationId; + if (requestBody instanceof JarRunApplicationRequestBody) { + applicationId = + ((JarRunApplicationRequestBody) requestBody) + .getApplicationId() + .orElse(null); + } else { + applicationId = null; + } + + return new JarHandlerContext( + jarFile, entryClass, programArgs, parallelism, jobId, applicationId); } public void applyToConfiguration( @@ -217,6 +234,10 @@ int getParallelism() { JobID getJobId() { return jobId; } + + public Optional getApplicationId() { + return Optional.ofNullable(applicationId); + } } private static List getClasspaths(Configuration configuration) { diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandlerParameterTest.java new file mode 100644 index 0000000000000..261a6dc4c6e8e --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandlerParameterTest.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.deployment.application.PackagedProgramApplication; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.PipelineOptionsInternal; +import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.RecoveryClaimMode; +import org.apache.flink.core.testutils.AllCallbackWrapper; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.messages.MessageParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.util.BlobServerExtension; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class JarRunApplicationHandlerParameterTest { + static final String[] PROG_ARGS = new String[] {"--host", "localhost", "--port", "1234"}; + static final int PARALLELISM = 4; + static final boolean ALLOW_NON_RESTORED_STATE_QUERY = true; + static final String RESTORE_PATH = "/foo/bar"; + static final RecoveryClaimMode RESTORE_MODE = RecoveryClaimMode.CLAIM; + + @RegisterExtension + private static final AllCallbackWrapper blobServerExtension = + new AllCallbackWrapper<>(new BlobServerExtension()); + + static final AtomicReference LAST_SUBMITTED_APPLICATION_REFERENCE = + new AtomicReference<>(); + + static TestingDispatcherGateway restfulGateway; + static Path jarDir; + static GatewayRetriever gatewayRetriever = + () -> CompletableFuture.completedFuture(restfulGateway); + static CompletableFuture localAddressFuture = + CompletableFuture.completedFuture("shazam://localhost:12345"); + static Duration timeout = Duration.ofSeconds(10); + static Map responseHeaders = Collections.emptyMap(); + + protected static Path jarWithManifest; + private static Path jarWithoutManifest; + + private static JarRunApplicationHandler handler; + + private static final Configuration FLINK_CONFIGURATION = + new Configuration() + .set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(120000L)) + .set(CoreOptions.DEFAULT_PARALLELISM, 57) + .set(StateRecoveryOptions.SAVEPOINT_PATH, "/foo/bar/test") + .set(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false) + .set(StateRecoveryOptions.RESTORE_MODE, RESTORE_MODE) + .set( + PipelineOptions.PARALLELISM_OVERRIDES, + new HashMap<>() { + { + put("v1", "10"); + } + }); + + @BeforeAll + static void setup(@TempDir File tempDir) throws Exception { + init(tempDir); + final GatewayRetriever gatewayRetriever = + () -> CompletableFuture.completedFuture(restfulGateway); + final Duration timeout = Duration.ofSeconds(10); + final Map responseHeaders = Collections.emptyMap(); + + handler = + new JarRunApplicationHandler( + gatewayRetriever, + timeout, + responseHeaders, + JarRunApplicationHeaders.getInstance(), + jarDir, + new Configuration()); + } + + static void init(File tmpDir) throws Exception { + jarDir = tmpDir.toPath(); + + // properties are set property by surefire plugin + final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar"; + final String parameterProgramWithoutManifestJarName = + System.getProperty("parameterJarWithoutManifestName") + ".jar"; + final Path jarLocation = Paths.get(System.getProperty("targetDir")); + + jarWithManifest = + Files.copy( + jarLocation.resolve(parameterProgramJarName), + jarDir.resolve("program-with-manifest.jar")); + jarWithoutManifest = + Files.copy( + jarLocation.resolve(parameterProgramWithoutManifestJarName), + jarDir.resolve("program-without-manifest.jar")); + + restfulGateway = + TestingDispatcherGateway.newBuilder() + .setBlobServerPort( + blobServerExtension.getCustomExtension().getBlobServerPort()) + .setSubmitApplicationFunction( + application -> { + if (application instanceof PackagedProgramApplication) { + LAST_SUBMITTED_APPLICATION_REFERENCE.set( + (PackagedProgramApplication) application); + return CompletableFuture.completedFuture(Acknowledge.get()); + } + return FutureUtils.completedExceptionally( + new FlinkRuntimeException( + "Unsupported application type")); + }) + .build(); + + gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway); + localAddressFuture = CompletableFuture.completedFuture("shazam://localhost:12345"); + timeout = Duration.ofSeconds(10); + responseHeaders = Collections.emptyMap(); + } + + @Test + void testDefaultParameters() throws Exception { + // baseline, ensure that reasonable defaults are chosen + handleRequest( + createRequest( + getDefaultJarRequestBody(), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithManifest)); + + validateDefaultApplication(); + } + + @Test + void testExceptionThrownWithoutManifestOrEntryClass() throws Exception { + final HandlerRequest request = + createRequest( + getDefaultJarRequestBody(), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithoutManifest); + + assertThatThrownBy(() -> handler.handleRequest(request, restfulGateway).get()) + .matches( + e -> { + final Throwable throwable = + ExceptionUtils.stripCompletionException(e.getCause()); + + final Optional invocationException = + ExceptionUtils.findThrowable( + throwable, ProgramInvocationException.class); + + assertThat(invocationException).isPresent(); + + final String exceptionMsg = invocationException.get().getMessage(); + assertThat(exceptionMsg) + .contains( + "Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file."); + + return true; + }); + } + + @Test + void testQueryParameters() throws Exception { + handleRequest( + createRequest( + getDefaultJarRequestBody(), + getJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithoutManifest)); + + validateApplication(); + } + + @Test + void testRequestBody() throws Exception { + handleRequest( + createRequest( + getJarRequestBody(), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithoutManifest)); + + validateApplication(); + } + + @Test + void testConfigurationViaRequestBody() throws Exception { + handleRequest( + createRequest( + getJarRequestWithConfiguration(), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithManifest)); + + validateApplicationWithConfiguration(); + } + + @Test + void testJobId() throws Exception { + final JobID jobId = new JobID(); + handleRequest( + createRequest( + getJarRequestBodyWithJobId(jobId), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithManifest)); + + final PackagedProgramApplication application = + LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null); + + assertThat( + application + .getConfiguration() + .get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)) + .isEqualTo(jobId.toHexString()); + } + + @Test + void testApplicationId() throws Exception { + final ApplicationID applicationId = new ApplicationID(); + handleRequest( + createRequest( + getJarRequestBodyWithApplicationId(applicationId), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithManifest)); + + final PackagedProgramApplication application = + LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null); + + assertThat(application.getApplicationId()).isEqualTo(applicationId); + } + + @Test + void testParameterPrioritization() throws Exception { + // parameters via query parameters and JSON request, JSON should be prioritized + handleRequest( + createRequest( + getJarRequestBody(), + getWrongJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithoutManifest)); + + validateApplication(); + } + + @Test + void testExceptionNotThrownWithEagerSink() throws Exception { + final Path jarLocation = Paths.get(System.getProperty("targetDir")); + final String parameterProgramWithEagerSink = "parameter-program-with-eager-sink.jar"; + Path jarWithEagerSink = + Files.copy( + jarLocation.resolve(parameterProgramWithEagerSink), + jarDir.resolve("program-with-eager-sink.jar")); + + // the handler do not run the program and should not throw an exception + handleRequest( + createRequest( + getDefaultJarRequestBody(), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithEagerSink)); + + validateDefaultApplication(); + } + + @Test + void testEmptySavepointPath() throws Exception { + handleRequest( + createRequest( + getJarRequestBodyWithSavepointPath(""), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithManifest)); + + final PackagedProgramApplication application = + LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null); + final SavepointRestoreSettings savepointRestoreSettings = + SavepointRestoreSettings.fromConfiguration(application.getConfiguration()); + + assertThat(savepointRestoreSettings).isEqualTo(SavepointRestoreSettings.none()); + } + + @Test + void testParallelismOverrides() throws Exception { + handleRequest( + createRequest( + getJarRequestWithConfiguration(), + getUnresolvedJarMessageParameters(), + getUnresolvedJarMessageParameters(), + jarWithManifest)); + + final PackagedProgramApplication application = + LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null); + + assertThat(application.getConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES)) + .containsOnlyKeys("v1") + .containsEntry("v1", "10"); + } + + protected static HandlerRequest createRequest( + JarRunApplicationRequestBody requestBody, + JarRunApplicationMessageParameters parameters, + JarRunApplicationMessageParameters unresolvedMessageParameters, + Path jar) + throws HandlerRequestException { + + final Map> queryParameterAsMap = + parameters.getQueryParameters().stream() + .filter(MessageParameter::isResolved) + .collect( + Collectors.toMap( + MessageParameter::getKey, + JarRunApplicationHandlerParameterTest::getValuesAsString)); + + return HandlerRequest.resolveParametersAndCreate( + requestBody, + unresolvedMessageParameters, + Collections.singletonMap(JarIdPathParameter.KEY, jar.getFileName().toString()), + queryParameterAsMap, + Collections.emptyList()); + } + + private static List getValuesAsString(MessageQueryParameter parameter) { + final List values = parameter.getValue(); + return values.stream().map(parameter::convertValueToString).collect(Collectors.toList()); + } + + JarRunApplicationMessageParameters getUnresolvedJarMessageParameters() { + return handler.getMessageHeaders().getUnresolvedMessageParameters(); + } + + JarRunApplicationMessageParameters getJarMessageParameters() { + final JarRunApplicationMessageParameters parameters = getUnresolvedJarMessageParameters(); + parameters.allowNonRestoredStateQueryParameter.resolve( + Collections.singletonList(ALLOW_NON_RESTORED_STATE_QUERY)); + parameters.savepointPathQueryParameter.resolve(Collections.singletonList(RESTORE_PATH)); + parameters.entryClassQueryParameter.resolve( + Collections.singletonList(ParameterProgram.class.getCanonicalName())); + parameters.parallelismQueryParameter.resolve(Collections.singletonList(PARALLELISM)); + parameters.programArgQueryParameter.resolve(Arrays.asList(PROG_ARGS)); + return parameters; + } + + JarRunApplicationMessageParameters getWrongJarMessageParameters() { + List wrongArgs = + Arrays.stream(PROG_ARGS).map(a -> a + "wrong").collect(Collectors.toList()); + final JarRunApplicationMessageParameters parameters = getUnresolvedJarMessageParameters(); + parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false)); + parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh")); + parameters.entryClassQueryParameter.resolve( + Collections.singletonList("please.dont.run.me")); + parameters.parallelismQueryParameter.resolve(Collections.singletonList(64)); + parameters.programArgQueryParameter.resolve(wrongArgs); + return parameters; + } + + JarRunApplicationRequestBody getDefaultJarRequestBody() { + return new JarRunApplicationRequestBody(); + } + + JarRunApplicationRequestBody getJarRequestBody() { + return new JarRunApplicationRequestBody( + ParameterProgram.class.getCanonicalName(), + Arrays.asList(PROG_ARGS), + PARALLELISM, + null, + ALLOW_NON_RESTORED_STATE_QUERY, + RESTORE_PATH, + RESTORE_MODE, + FLINK_CONFIGURATION.toMap(), + null); + } + + private JarRunApplicationRequestBody getJarRequestBodyWithSavepointPath(String savepointPath) { + return new JarRunApplicationRequestBody( + ParameterProgram.class.getCanonicalName(), + Arrays.asList(PROG_ARGS), + PARALLELISM, + null, + ALLOW_NON_RESTORED_STATE_QUERY, + savepointPath, + RESTORE_MODE, + null, + null); + } + + JarRunApplicationRequestBody getJarRequestBodyWithJobId(JobID jobId) { + return new JarRunApplicationRequestBody( + null, null, null, jobId, null, null, null, null, null); + } + + JarRunApplicationRequestBody getJarRequestBodyWithApplicationId(ApplicationID applicationId) { + return new JarRunApplicationRequestBody( + null, null, null, null, null, null, null, null, applicationId); + } + + JarRunApplicationRequestBody getJarRequestWithConfiguration() { + return new JarRunApplicationRequestBody( + null, null, null, null, null, null, null, FLINK_CONFIGURATION.toMap(), null); + } + + void handleRequest(HandlerRequest request) throws Exception { + handler.handleRequest(request, restfulGateway).get(); + } + + void validateDefaultApplication() { + PackagedProgramApplication application = + LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null); + + assertThat(application.getPackagedProgram().getArguments()).isEmpty(); + + final Configuration configuration = application.getConfiguration(); + assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM)) + .isEqualTo(CoreOptions.DEFAULT_PARALLELISM.defaultValue().intValue()); + + final SavepointRestoreSettings savepointRestoreSettings = + SavepointRestoreSettings.fromConfiguration(configuration); + assertThat(savepointRestoreSettings.allowNonRestoredState()).isFalse(); + assertThat(savepointRestoreSettings.getRestorePath()).isNull(); + } + + void validateApplication() { + PackagedProgramApplication application = + LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null); + + assertThat(application.getPackagedProgram().getArguments()).isEqualTo(PROG_ARGS); + + final Configuration configuration = application.getConfiguration(); + assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM)).isEqualTo(PARALLELISM); + + final SavepointRestoreSettings savepointRestoreSettings = + SavepointRestoreSettings.fromConfiguration(configuration); + assertThat(savepointRestoreSettings.allowNonRestoredState()) + .isEqualTo(ALLOW_NON_RESTORED_STATE_QUERY); + assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo(RESTORE_PATH); + } + + void validateApplicationWithConfiguration() { + PackagedProgramApplication application = + LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null); + final Configuration configuration = application.getConfiguration(); + + assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM)) + .isEqualTo(FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM)); + assertThat(configuration.get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT)) + .isEqualTo(FLINK_CONFIGURATION.get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT)); + + final SavepointRestoreSettings savepointRestoreSettings = + SavepointRestoreSettings.fromConfiguration(configuration); + assertThat(savepointRestoreSettings.getRecoveryClaimMode()) + .isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.RESTORE_MODE)); + assertThat(savepointRestoreSettings.getRestorePath()) + .isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.SAVEPOINT_PATH)); + assertThat(savepointRestoreSettings.allowNonRestoredState()) + .isEqualTo( + FLINK_CONFIGURATION.get( + StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE)); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBodyTest.java new file mode 100644 index 0000000000000..49cce9161782e --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBodyTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.execution.RecoveryClaimMode; +import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link JarRunApplicationRequestBody}. */ +@ExtendWith(NoOpTestExtension.class) +class JarRunApplicationRequestBodyTest + extends RestRequestMarshallingTestBase { + + @Override + protected Class getTestRequestClass() { + return JarRunApplicationRequestBody.class; + } + + @Override + protected JarRunApplicationRequestBody getTestRequestInstance() { + return new JarRunApplicationRequestBody( + "hello", + Arrays.asList("boo", "far"), + 4, + new JobID(), + true, + "foo/bar", + RecoveryClaimMode.CLAIM, + Collections.singletonMap("key", "value"), + new ApplicationID()); + } + + @Override + protected void assertOriginalEqualsToUnmarshalled( + final JarRunApplicationRequestBody expected, + final JarRunApplicationRequestBody actual) { + assertThat(actual.getEntryClassName()).isEqualTo(expected.getEntryClassName()); + assertThat(actual.getProgramArgumentsList()).isEqualTo(expected.getProgramArgumentsList()); + assertThat(actual.getParallelism()).isEqualTo(expected.getParallelism()); + assertThat(actual.getJobId()).isEqualTo(expected.getJobId()); + assertThat(actual.getAllowNonRestoredState()) + .isEqualTo(expected.getAllowNonRestoredState()); + assertThat(actual.getSavepointPath()).isEqualTo(expected.getSavepointPath()); + assertThat(actual.getRecoveryClaimMode()).isEqualTo(expected.getRecoveryClaimMode()); + assertThat(actual.getFlinkConfiguration().toMap()) + .containsExactlyEntriesOf(expected.getFlinkConfiguration().toMap()); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBodyTest.java new file mode 100644 index 0000000000000..673a2b91f4146 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBodyTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.ApplicationID; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension; + +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link JarRunApplicationResponseBody}. */ +@ExtendWith(NoOpTestExtension.class) +public class JarRunApplicationResponseBodyTest + extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return JarRunApplicationResponseBody.class; + } + + @Override + protected JarRunApplicationResponseBody getTestResponseInstance() { + return new JarRunApplicationResponseBody(new ApplicationID()); + } + + @Override + protected void assertOriginalEqualsToUnmarshalled( + final JarRunApplicationResponseBody expected, + final JarRunApplicationResponseBody actual) { + assertThat(actual.getApplicationId()).isEqualTo(expected.getApplicationId()); + } +} diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java index 1a30ec54ad59a..13f9f3e21153c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java @@ -44,5 +44,6 @@ protected JarRunResponseBody getTestResponseInstance() throws Exception { protected void assertOriginalEqualsToUnmarshalled( final JarRunResponseBody expected, final JarRunResponseBody actual) { assertThat(actual.getJobId()).isEqualTo(expected.getJobId()); + assertThat(actual.getApplicationId()).isEqualTo(expected.getApplicationId()); } } diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 406ebe85ef005..87a995c854b18 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -765,6 +765,86 @@ "properties" : { "jobid" : { "type" : "any" + }, + "applicationId" : { + "type" : "any" + } + } + } + }, { + "url" : "/jars/:jarid/run-application", + "method" : "POST", + "status-code" : "202 Accepted", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jarid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ { + "key" : "allowNonRestoredState", + "mandatory" : false + }, { + "key" : "savepointPath", + "mandatory" : false + }, { + "key" : "programArg", + "mandatory" : false + }, { + "key" : "entry-class", + "mandatory" : false + }, { + "key" : "parallelism", + "mandatory" : false + } ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunApplicationRequestBody", + "properties" : { + "entryClass" : { + "type" : "string" + }, + "programArgsList" : { + "type" : "array", + "items" : { + "type" : "string" + } + }, + "parallelism" : { + "type" : "integer" + }, + "jobId" : { + "type" : "any" + }, + "allowNonRestoredState" : { + "type" : "boolean" + }, + "savepointPath" : { + "type" : "string" + }, + "claimMode" : { + "type" : "string", + "enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ] + }, + "flinkConfiguration" : { + "type" : "object", + "additionalProperties" : { + "type" : "string" + } + }, + "applicationId" : { + "type" : "any" + } + } + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunApplicationResponseBody", + "properties" : { + "applicationid" : { + "type" : "any" } } }