diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index d2047ced59..3e35c388b4 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -97,6 +97,13 @@ object Utils extends Logger { new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile))).getManifest } + def getJarManClass(jarFile: File): String = { + val manifest = getJarManifest(jarFile) + val mainAttr = manifest.getMainAttributes + Option(mainAttr.getValue("Main-Class")) + .getOrElse(Option(mainAttr.getValue("program-class")).orNull) + } + def copyProperties(original: Properties, target: Properties): Unit = original.foreach(x => target.put(x._1, x._2)) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java index 9193cefa35..5ffa7a02ee 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java @@ -86,6 +86,8 @@ public interface ApplicationService extends IService { boolean checkAlter(Application application); + Map getRumtimeConfig(Long id); + void updateRelease(Application application); List getByProjectId(Long id); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 49f7a32f1c..c58fcd5f20 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -136,6 +136,7 @@ import java.util.Arrays; import java.util.Base64; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.EnumSet; @@ -152,7 +153,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.jar.Manifest; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -403,11 +403,6 @@ public Boolean delete(Application paramApp) { // 8) remove app removeApp(application); - if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(application)); - } else { - FlinkRESTAPIWatcher.unWatching(paramApp.getId()); - } return true; } @@ -460,6 +455,23 @@ public boolean checkAlter(Application application) { return cancelUserId != -1 && cancelUserId != appUserId; } + @Override + public Map getRumtimeConfig(Long id) { + Application application = getById(id); + if (application != null && application.getVersionId() != null) { + FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId()); + if (flinkEnv != null) { + File yaml = new File(flinkEnv.getFlinkHome().concat("/conf/flink-conf.yaml")); + Map config = PropertiesUtils.loadFlinkConfYaml(yaml); + Map dynamicConf = + PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties()); + config.putAll(dynamicConf); + return config; + } + } + return Collections.emptyMap(); + } + private void removeApp(Application application) { Long appId = application.getId(); removeById(appId); @@ -695,7 +707,7 @@ public AppExistsState checkExists(Application appParam) { } // check whether clusterId, namespace, jobId on kubernetes else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode()) - && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) { + && k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) { return AppExistsState.IN_KUBERNETES; } } @@ -1135,10 +1147,7 @@ public void forcedStop(Application app) { if (isKubernetesApp(application)) { KubernetesDeploymentHelper.watchPodTerminatedLog( application.getK8sNamespace(), application.getJobName(), application.getJobId()); - KubernetesDeploymentHelper.deleteTaskDeployment( - application.getK8sNamespace(), application.getJobName()); - KubernetesDeploymentHelper.deleteTaskConfigMap( - application.getK8sNamespace(), application.getJobName()); + KubernetesDeploymentHelper.delete(application.getK8sNamespace(), application.getJobName()); } if (startFuture != null) { startFuture.cancel(true); @@ -1217,8 +1226,7 @@ public String getMain(Application application) { project.getDistHome().getAbsolutePath().concat("/").concat(application.getModule()); jarFile = new File(modulePath, application.getJar()); } - Manifest manifest = Utils.getJarManifest(jarFile); - return manifest.getMainAttributes().getValue("Main-Class"); + return Utils.getJarManClass(jarFile); } @Override @@ -1322,6 +1330,8 @@ public void cancel(Application appParam) throws Exception { cancelFutureMap.put(application.getId(), cancelFuture); + TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : null; + cancelFuture.whenComplete( (cancelResponse, throwable) -> { cancelFutureMap.remove(application.getId()); @@ -1345,9 +1355,8 @@ public void cancel(Application appParam) throws Exception { } // re-tracking flink job on kubernetes and logging exception if (isKubernetesApp(application)) { - TrackId id = toTrackId(application); - k8SFlinkTrackMonitor.unWatching(id); - k8SFlinkTrackMonitor.doWatching(id); + KubernetesDeploymentHelper.delete(trackId.namespace(), trackId.clusterId()); + k8SFlinkTrackMonitor.unWatching(trackId); } else { FlinkRESTAPIWatcher.unWatching(application.getId()); } @@ -1373,7 +1382,8 @@ public void cancel(Application appParam) throws Exception { } if (isKubernetesApp(application)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(application)); + KubernetesDeploymentHelper.delete(trackId.namespace(), trackId.clusterId()); + k8SFlinkTrackMonitor.unWatching(trackId); } }); } @@ -1389,24 +1399,22 @@ public String checkSavepointPath(Application appParam) throws Exception { final URI uri = URI.create(savepointPath); final String scheme = uri.getScheme(); final String pathPart = uri.getPath(); - String error = null; if (scheme == null) { - error = - "This state.savepoints.dir value " - + savepointPath - + " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI."; - } else if (pathPart == null) { - error = - "This state.savepoints.dir value " - + savepointPath - + " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data."; - } else if (pathPart.isEmpty() || "/".equals(pathPart)) { - error = - "This state.savepoints.dir value " - + savepointPath - + " Cannot use the root directory for checkpoints."; + return "This state.savepoints.dir value " + + savepointPath + + " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI."; + } + if (pathPart == null) { + return "This state.savepoints.dir value " + + savepointPath + + " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data."; } - return error; + if (pathPart.isEmpty() || "/".equals(pathPart)) { + return "This state.savepoints.dir value " + + savepointPath + + " Cannot use the root directory for checkpoints."; + } + return null; } else { return "When custom savepoint is not set, state.savepoints.dir needs to be set in properties or flink-conf.yaml of application"; } @@ -1552,6 +1560,14 @@ public void start(Application appParam, boolean auto) throws Exception { extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql()); } + TrackId trackId; + if (isKubernetesApp(application)) { + trackId = toTrackId(application); + KubernetesDeploymentHelper.delete(trackId.namespace(), trackId.clusterId()); + } else { + trackId = null; + } + KubernetesSubmitParam kubernetesSubmitParam = new KubernetesSubmitParam( application.getClusterId(), @@ -1613,7 +1629,7 @@ public void start(Application appParam, boolean auto) throws Exception { app.setOptionState(OptionState.NONE.getValue()); updateById(app); if (isKubernetesApp(app)) { - k8SFlinkTrackMonitor.unWatching(toTrackId(app)); + k8SFlinkTrackMonitor.unWatching(trackId); } else { FlinkRESTAPIWatcher.unWatching(appParam.getId()); } @@ -1649,7 +1665,7 @@ public void start(Application appParam, boolean auto) throws Exception { // if start completed, will be added task to tracking queue if (isKubernetesApp(application)) { application.setRelease(ReleaseState.DONE.get()); - k8SFlinkTrackMonitor.doWatching(toTrackId(application)); + k8SFlinkTrackMonitor.doWatching(trackId); if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) { String domainName = settingService.getIngressModeDefault(); if (StringUtils.isNotBlank(domainName)) { @@ -1748,7 +1764,7 @@ private void doStopped(Application appParam) { // re-tracking flink job on kubernetes and logging exception if (isKubernetesApp(application)) { TrackId id = toTrackId(application); - k8SFlinkTrackMonitor.unWatching(id); + KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId()); k8SFlinkTrackMonitor.doWatching(id); } else { FlinkRESTAPIWatcher.unWatching(application.getId()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java index af3b816d56..d545800329 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java @@ -23,6 +23,7 @@ import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher; import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory; import org.apache.streampark.flink.kubernetes.FlinkTrackConfig; +import org.apache.streampark.flink.kubernetes.KubernetesRetriever; import org.apache.streampark.flink.kubernetes.enums.FlinkJobState; import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode; import org.apache.streampark.flink.kubernetes.model.TrackId; @@ -101,17 +102,16 @@ private List getK8sWatchingApps() { if (CollectionUtils.isEmpty(k8sApplication)) { return Lists.newArrayList(); } - // correct corrupted data - List correctApps = - k8sApplication.stream() - .filter(app -> !Bridge.toTrackId(app).isLegal()) - .collect(Collectors.toList()); - if (CollectionUtils.isNotEmpty(correctApps)) { - applicationService.saveOrUpdateBatch(correctApps); - } // filter out the application that should be tracking return k8sApplication.stream() - .filter(app -> !FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()))) + .filter( + app -> { + boolean isEndState = + FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())); + boolean deploymentExists = + KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(), app.getClusterId()); + return !isEndState || deploymentExists; + }) .map(Bridge::toTrackId) .collect(Collectors.toList()); } @@ -121,6 +121,7 @@ public static class Bridge { // covert Application to TrackId public static TrackId toTrackId(@Nonnull Application app) { + Enumeration.Value mode = FlinkK8sExecuteMode.of(app.getExecutionModeEnum()); if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) { return TrackId.onApplication( diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index 71ee9d6de0..79ba51812f 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -63,7 +63,7 @@ object KubernetesRetriever extends Logger { private val clusterClientServiceLoader = new DefaultClusterClientServiceLoader() /** get new flink cluster client of kubernetes mode */ - def newFinkClusterClient( + private def newFinkClusterClient( clusterId: String, @Nullable namespace: String, executeMode: FlinkK8sExecuteMode.Value): Option[ClusterClient[String]] = { @@ -110,7 +110,7 @@ object KubernetesRetriever extends Logger { * @param namespace * deployment namespace */ - def isDeploymentExists(name: String, namespace: String): Boolean = { + def isDeploymentExists(namespace: String, deploymentName: String): Boolean = { using(KubernetesRetriever.newK8sClient()) { client => client @@ -121,7 +121,7 @@ object KubernetesRetriever extends Logger { .list() .getItems .asScala - .exists(e => e.getMetadata.getName == name) + .exists(_.getMetadata.getName == deploymentName) }(_ => false) } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala index 5495814f27..590cd806d4 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala @@ -55,7 +55,7 @@ object KubernetesDeploymentHelper extends Logger { } } - def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): Boolean = { + def isDeploymentError(nameSpace: String, deploymentName: String): Boolean = { Try { val pods = getPods(nameSpace, deploymentName) val podStatus = pods.head.getStatus @@ -68,12 +68,7 @@ object KubernetesDeploymentHelper extends Logger { }.getOrElse(true) } - def getTheNumberOfTaskDeploymentRetries(nameSpace: String, deploymentName: String): Integer = { - val pods = getPods(nameSpace, deploymentName) - pods.head.getStatus.getContainerStatuses.head.getRestartCount - } - - def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean = { + private[this] def deleteDeployment(nameSpace: String, deploymentName: String): Boolean = { using(KubernetesRetriever.newK8sClient()) { client => Try { @@ -86,7 +81,26 @@ object KubernetesDeploymentHelper extends Logger { } } - def isTheK8sConnectionNormal(): Boolean = { + private[this] def deleteConfigMap(nameSpace: String, deploymentName: String): Boolean = { + using(KubernetesRetriever.newK8sClient()) { + client => + Try { + val r = client + .configMaps() + .inNamespace(nameSpace) + .withLabel("app", deploymentName) + .delete + Boolean.unbox(r) + }.getOrElse(false) + } + } + + def delete(nameSpace: String, deploymentName: String): Unit = { + deleteDeployment(nameSpace, deploymentName) + deleteConfigMap(nameSpace, deploymentName) + } + + def checkConnection(): Boolean = { Try(new DefaultKubernetesClient) match { case Success(client) => client.close() @@ -125,20 +139,6 @@ object KubernetesDeploymentHelper extends Logger { }(error => throw error) } - def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean = { - using(KubernetesRetriever.newK8sClient()) { - client => - Try { - val r = client - .configMaps() - .inNamespace(nameSpace) - .withLabel("app", deploymentName) - .delete - Boolean.unbox(r) - }.getOrElse(false) - } - } - private[kubernetes] def getJobLog(jobId: String): String = { val tmpPath = SystemPropertyUtils.getTmpdir() s"$tmpPath/$jobId.log" diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala index fba47eee6d..e280122c8b 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala @@ -20,6 +20,8 @@ package org.apache.streampark.flink.kubernetes.model import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode +import java.lang.{Boolean => JavaBool} + import scala.util.Try /** tracking identifier for flink on kubernetes */ diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index 55f62f07fb..2879f17bb7 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -125,7 +125,12 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi watchController.trackIds.update(trackId) eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState)) } - if (FlinkJobState.isEndState(jobState.jobState)) { + + val deployExists = KubernetesRetriever.isDeploymentExists( + trackId.namespace, + trackId.clusterId + ) + if (FlinkJobState.isEndState(jobState.jobState) && !deployExists) { // remove trackId from cache of job that needs to be untracked watchController.unWatching(trackId) if (trackId.executeMode == APPLICATION) { @@ -213,9 +218,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi } else { jobDetails.map { d => - TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d.toJobStatusCV( - pollEmitTime, - System.currentTimeMillis) + TrackId.onSession(namespace, clusterId, appId, d.jid, groupId) -> d + .toJobStatusCV(pollEmitTime, System.currentTimeMillis) } } } @@ -284,21 +288,27 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi // infer from k8s deployment and event val latest: JobStatusCV = watchController.jobStatuses.get(trackId) - logger.info( - s"Query the local cache result:${watchController.canceling.has(trackId).toString},trackId ${trackId.toString}.") - val jobState = { - if (watchController.canceling.has(trackId)) FlinkJobState.CANCELED - else { + + val jobState = trackId match { + case id if watchController.canceling.has(id) => + logger.info(s"trackId ${trackId.toString} is canceling") + FlinkJobState.CANCELED + case _ => // whether deployment exists on kubernetes cluster - val isDeployExists = - KubernetesRetriever.isDeploymentExists(trackId.clusterId, trackId.namespace) - val deployStateOfTheError = KubernetesDeploymentHelper.getDeploymentStatusChanges( + val deployExists = KubernetesRetriever.isDeploymentExists( trackId.namespace, - trackId.clusterId) - val isConnection = KubernetesDeploymentHelper.isTheK8sConnectionNormal() + trackId.clusterId + ) - if (isDeployExists) { - if (!deployStateOfTheError) { + val deployError = KubernetesDeploymentHelper.isDeploymentError( + trackId.namespace, + trackId.clusterId + ) + + val isConnection = KubernetesDeploymentHelper.checkConnection() + + if (deployExists) { + if (!deployError) { logger.info("Task Enter the initialization process.") FlinkJobState.K8S_INITIALIZING } else if (isConnection) { @@ -307,7 +317,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi trackId.namespace, trackId.clusterId, trackId.jobId) - KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId) FlinkJobState.FAILED } else { inferSilentOrLostFromPreCache(latest) @@ -318,8 +327,6 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi } else { inferSilentOrLostFromPreCache(latest) } - - } } val jobStatusCV = JobStatusCV(