Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.webank.wedatasphere.streamis.jobmanager.manager.utils.JobUtils

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

/**
* Created by enjoyyin on 2021/9/23.
Expand All @@ -50,20 +51,24 @@ class FlinkJarStreamisStartupParamsTransform extends Transform {
startupMap.put("flink.app.main.class.jar", transformJobContent.getMainClassJar.getFileName)
startupMap.put("flink.app.main.class.jar.bml.json",
JsonUtils.jackson.writeValueAsString(getStreamisFileContent(transformJobContent.getMainClassJar)))
val classpathFiles = if(transformJobContent.getDependencyJars != null && transformJobContent.getResources != null) {
startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(","))
transformJobContent.getDependencyJars.asScala ++ transformJobContent.getResources.asScala
} else if(transformJobContent.getDependencyJars != null) {
startupMap.put("flink.app.user.class.path", transformJobContent.getDependencyJars.asScala.map(_.getFileName).mkString(","))
transformJobContent.getDependencyJars.asScala
} else if(transformJobContent.getResources != null) {
startupMap.put("flink.yarn.ship-directories", transformJobContent.getResources.asScala.map(_.getFileName).mkString(","))
transformJobContent.getResources.asScala

/**
* Notice : "flink.app.user.class.path" equals to PipelineOptions.CLASSPATHS in Flink
* paths must specify a protocol (e.g. file://) and be accessible on all nodes
* so we use "flink.yarn.ship-directories" instead
*/
var classPathFiles = Option(transformJobContent.getDependencyJars) match {
case Some(list) => list.asScala
case _ => mutable.Buffer[StreamisFile]()
}
Option(transformJobContent.getResources) match {
case Some(list) => classPathFiles = classPathFiles ++ list.asScala
case _ => // Do nothing
}
else mutable.Buffer[StreamisFile]()
if(classpathFiles.nonEmpty)
startupMap.put("flink.yarn.ship-directories", classPathFiles.map(_.getFileName).mkString(","))
if(classPathFiles.nonEmpty)
startupMap.put("flink.app.user.class.path.bml.json",
JsonUtils.jackson.writeValueAsString(classpathFiles.map(getStreamisFileContent).asJava))
JsonUtils.jackson.writeValueAsString(classPathFiles.map(getStreamisFileContent).asJava))
if(transformJobContent.getHdfsJars != null)
startupMap.put("flink.user.lib.path", transformJobContent.getHdfsJars.asScala.mkString(","))
val params = if(job.getParams == null) new util.HashMap[String, Any] else job.getParams
Expand Down