Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] k8s job state monitor bug fixed #3458

Merged
merged 5 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ object Utils extends Logger {
new JarInputStream(new BufferedInputStream(new FileInputStream(jarFile))).getManifest
}

def getJarManClass(jarFile: File): String = {
val manifest = getJarManifest(jarFile)
val mainAttr = manifest.getMainAttributes
Option(mainAttr.getValue("Main-Class"))
.getOrElse(Option(mainAttr.getValue("program-class")).orNull)
}

def copyProperties(original: Properties, target: Properties): Unit =
original.foreach(x => target.put(x._1, x._2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public interface ApplicationService extends IService<Application> {

boolean checkAlter(Application application);

Map<String, String> getRumtimeConfig(Long id);

void updateRelease(Application application);

List<Application> getByProjectId(Long id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
Expand All @@ -152,7 +153,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.jar.Manifest;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -403,11 +403,6 @@ public Boolean delete(Application paramApp) {
// 8) remove app
removeApp(application);

if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
} else {
FlinkRESTAPIWatcher.unWatching(paramApp.getId());
}
return true;
}

Expand Down Expand Up @@ -460,6 +455,23 @@ public boolean checkAlter(Application application) {
return cancelUserId != -1 && cancelUserId != appUserId;
}

@Override
public Map<String, String> getRumtimeConfig(Long id) {
Application application = getById(id);
if (application != null && application.getVersionId() != null) {
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
if (flinkEnv != null) {
File yaml = new File(flinkEnv.getFlinkHome().concat("/conf/flink-conf.yaml"));
Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(yaml);
Map<String, String> dynamicConf =
PropertiesUtils.extractDynamicPropertiesAsJava(application.getDynamicProperties());
config.putAll(dynamicConf);
return config;
}
}
return Collections.emptyMap();
}

private void removeApp(Application application) {
Long appId = application.getId();
removeById(appId);
Expand Down Expand Up @@ -695,7 +707,7 @@ public AppExistsState checkExists(Application appParam) {
}
// check whether clusterId, namespace, jobId on kubernetes
else if (ExecutionMode.isKubernetesMode(appParam.getExecutionMode())
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(appParam))) {
&& k8SFlinkTrackMonitor.checkIsInRemoteCluster(toTrackId(app))) {
return AppExistsState.IN_KUBERNETES;
}
}
Expand Down Expand Up @@ -1135,10 +1147,7 @@ public void forcedStop(Application app) {
if (isKubernetesApp(application)) {
KubernetesDeploymentHelper.watchPodTerminatedLog(
application.getK8sNamespace(), application.getJobName(), application.getJobId());
KubernetesDeploymentHelper.deleteTaskDeployment(
application.getK8sNamespace(), application.getJobName());
KubernetesDeploymentHelper.deleteTaskConfigMap(
application.getK8sNamespace(), application.getJobName());
KubernetesDeploymentHelper.delete(application.getK8sNamespace(), application.getJobName());
}
if (startFuture != null) {
startFuture.cancel(true);
Expand Down Expand Up @@ -1217,8 +1226,7 @@ public String getMain(Application application) {
project.getDistHome().getAbsolutePath().concat("/").concat(application.getModule());
jarFile = new File(modulePath, application.getJar());
}
Manifest manifest = Utils.getJarManifest(jarFile);
return manifest.getMainAttributes().getValue("Main-Class");
return Utils.getJarManClass(jarFile);
}

@Override
Expand Down Expand Up @@ -1322,6 +1330,8 @@ public void cancel(Application appParam) throws Exception {

cancelFutureMap.put(application.getId(), cancelFuture);

TrackId trackId = isKubernetesApp(application) ? toTrackId(application) : null;

cancelFuture.whenComplete(
(cancelResponse, throwable) -> {
cancelFutureMap.remove(application.getId());
Expand All @@ -1345,9 +1355,8 @@ public void cancel(Application appParam) throws Exception {
}
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.unWatching(id);
k8SFlinkTrackMonitor.doWatching(id);
KubernetesDeploymentHelper.delete(trackId.namespace(), trackId.clusterId());
k8SFlinkTrackMonitor.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
}
Expand All @@ -1373,7 +1382,8 @@ public void cancel(Application appParam) throws Exception {
}

if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(application));
KubernetesDeploymentHelper.delete(trackId.namespace(), trackId.clusterId());
k8SFlinkTrackMonitor.unWatching(trackId);
}
});
}
Expand All @@ -1389,24 +1399,22 @@ public String checkSavepointPath(Application appParam) throws Exception {
final URI uri = URI.create(savepointPath);
final String scheme = uri.getScheme();
final String pathPart = uri.getPath();
String error = null;
if (scheme == null) {
error =
"This state.savepoints.dir value "
+ savepointPath
+ " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI.";
} else if (pathPart == null) {
error =
"This state.savepoints.dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
} else if (pathPart.isEmpty() || "/".equals(pathPart)) {
error =
"This state.savepoints.dir value "
+ savepointPath
+ " Cannot use the root directory for checkpoints.";
return "This state.savepoints.dir value "
+ savepointPath
+ " scheme (hdfs://, file://, etc) of is null. Please specify the file system scheme explicitly in the URI.";
}
if (pathPart == null) {
return "This state.savepoints.dir value "
+ savepointPath
+ " path part to store the checkpoint data in is null. Please specify a directory path for the checkpoint data.";
}
return error;
if (pathPart.isEmpty() || "/".equals(pathPart)) {
return "This state.savepoints.dir value "
+ savepointPath
+ " Cannot use the root directory for checkpoints.";
}
return null;
} else {
return "When custom savepoint is not set, state.savepoints.dir needs to be set in properties or flink-conf.yaml of application";
}
Expand Down Expand Up @@ -1552,6 +1560,14 @@ public void start(Application appParam, boolean auto) throws Exception {
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}

TrackId trackId;
if (isKubernetesApp(application)) {
trackId = toTrackId(application);
KubernetesDeploymentHelper.delete(trackId.namespace(), trackId.clusterId());
} else {
trackId = null;
}

KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
application.getClusterId(),
Expand Down Expand Up @@ -1613,7 +1629,7 @@ public void start(Application appParam, boolean auto) throws Exception {
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
if (isKubernetesApp(app)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(app));
k8SFlinkTrackMonitor.unWatching(trackId);
} else {
FlinkRESTAPIWatcher.unWatching(appParam.getId());
}
Expand Down Expand Up @@ -1649,7 +1665,7 @@ public void start(Application appParam, boolean auto) throws Exception {
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
application.setRelease(ReleaseState.DONE.get());
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
k8SFlinkTrackMonitor.doWatching(trackId);
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
Expand Down Expand Up @@ -1748,7 +1764,7 @@ private void doStopped(Application appParam) {
// re-tracking flink job on kubernetes and logging exception
if (isKubernetesApp(application)) {
TrackId id = toTrackId(application);
k8SFlinkTrackMonitor.unWatching(id);
KubernetesDeploymentHelper.delete(id.namespace(), id.clusterId());
k8SFlinkTrackMonitor.doWatching(id);
} else {
FlinkRESTAPIWatcher.unWatching(application.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcher;
import org.apache.streampark.flink.kubernetes.FlinkK8sWatcherFactory;
import org.apache.streampark.flink.kubernetes.FlinkTrackConfig;
import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState;
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
import org.apache.streampark.flink.kubernetes.model.TrackId;
Expand Down Expand Up @@ -101,17 +102,16 @@ private List<TrackId> getK8sWatchingApps() {
if (CollectionUtils.isEmpty(k8sApplication)) {
return Lists.newArrayList();
}
// correct corrupted data
List<Application> correctApps =
k8sApplication.stream()
.filter(app -> !Bridge.toTrackId(app).isLegal())
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(correctApps)) {
applicationService.saveOrUpdateBatch(correctApps);
}
// filter out the application that should be tracking
return k8sApplication.stream()
.filter(app -> !FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum())))
.filter(
app -> {
boolean isEndState =
FlinkJobState.isEndState(toK8sFlinkJobState(app.getFlinkAppStateEnum()));
boolean deploymentExists =
KubernetesRetriever.isDeploymentExists(app.getK8sNamespace(), app.getClusterId());
return !isEndState || deploymentExists;
})
.map(Bridge::toTrackId)
.collect(Collectors.toList());
}
Expand All @@ -121,6 +121,7 @@ public static class Bridge {

// covert Application to TrackId
public static TrackId toTrackId(@Nonnull Application app) {

Enumeration.Value mode = FlinkK8sExecuteMode.of(app.getExecutionModeEnum());
if (FlinkK8sExecuteMode.APPLICATION().equals(mode)) {
return TrackId.onApplication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object KubernetesRetriever extends Logger {
private val clusterClientServiceLoader = new DefaultClusterClientServiceLoader()

/** get new flink cluster client of kubernetes mode */
def newFinkClusterClient(
private def newFinkClusterClient(
clusterId: String,
@Nullable namespace: String,
executeMode: FlinkK8sExecuteMode.Value): Option[ClusterClient[String]] = {
Expand Down Expand Up @@ -110,7 +110,7 @@ object KubernetesRetriever extends Logger {
* @param namespace
* deployment namespace
*/
def isDeploymentExists(name: String, namespace: String): Boolean = {
def isDeploymentExists(namespace: String, deploymentName: String): Boolean = {
using(KubernetesRetriever.newK8sClient()) {
client =>
client
Expand All @@ -121,7 +121,7 @@ object KubernetesRetriever extends Logger {
.list()
.getItems
.asScala
.exists(e => e.getMetadata.getName == name)
.exists(_.getMetadata.getName == deploymentName)
}(_ => false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object KubernetesDeploymentHelper extends Logger {
}
}

def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): Boolean = {
def isDeploymentError(nameSpace: String, deploymentName: String): Boolean = {
Try {
val pods = getPods(nameSpace, deploymentName)
val podStatus = pods.head.getStatus
Expand All @@ -68,12 +68,7 @@ object KubernetesDeploymentHelper extends Logger {
}.getOrElse(true)
}

def getTheNumberOfTaskDeploymentRetries(nameSpace: String, deploymentName: String): Integer = {
val pods = getPods(nameSpace, deploymentName)
pods.head.getStatus.getContainerStatuses.head.getRestartCount
}

def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean = {
private[this] def deleteDeployment(nameSpace: String, deploymentName: String): Boolean = {
using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
Expand All @@ -86,7 +81,26 @@ object KubernetesDeploymentHelper extends Logger {
}
}

def isTheK8sConnectionNormal(): Boolean = {
private[this] def deleteConfigMap(nameSpace: String, deploymentName: String): Boolean = {
using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
val r = client
.configMaps()
.inNamespace(nameSpace)
.withLabel("app", deploymentName)
.delete
Boolean.unbox(r)
}.getOrElse(false)
}
}

def delete(nameSpace: String, deploymentName: String): Unit = {
deleteDeployment(nameSpace, deploymentName)
deleteConfigMap(nameSpace, deploymentName)
}

def checkConnection(): Boolean = {
Try(new DefaultKubernetesClient) match {
case Success(client) =>
client.close()
Expand Down Expand Up @@ -125,20 +139,6 @@ object KubernetesDeploymentHelper extends Logger {
}(error => throw error)
}

def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean = {
using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
val r = client
.configMaps()
.inNamespace(nameSpace)
.withLabel("app", deploymentName)
.delete
Boolean.unbox(r)
}.getOrElse(false)
}
}

private[kubernetes] def getJobLog(jobId: String): String = {
val tmpPath = SystemPropertyUtils.getTmpdir()
s"$tmpPath/$jobId.log"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.streampark.flink.kubernetes.model
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode

import java.lang.{Boolean => JavaBool}

import scala.util.Try

/** tracking identifier for flink on kubernetes */
Expand Down
Loading
Loading