1.Yarn Client提交源码
爱护生命,少肝源码!
1.任务的提交
==程序起点==
spark-submit.cmd
==脚本启动==
bin/spark-submit
--class WordCount
--master yarn
--deploy-mode cluster
./wordCount.jar
./input ./output
2.SparkSubmit
shell
spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10
java -p
"Java\jdk1.8.0_131\bin\java" -cp "C:\spark-3.0.0-bin-hadoop3.2\bin\..\conf\;
\spark-3.0.0-bin-hadoop3.2\bin\..\jars\*"
-Xmx1g org.apache.spark.deploy.SparkSubmit
--master local[2]
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-3.0.0.jar 10
java -cp 等同于classpath
启动submit的main方法
- java -cp
- 开启JVM虚拟机
- 开启Process(SparkSubmit)
- 程序入口SparkSubmit.main
org.apache.spark.deploy.SparkSubmit,找到SparkSubmit的伴生对象,并找到main方法
main
org/apache/spark/deploy/SparkSubmit.scala
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {new SparkSubmitArguments(args) {...}}//在主程序中new一个用于解析参数方法的的对象
...//日志方法
submit.doSubmit(args)//调用doSubmit方法
}
doSubmit
org/apache/spark/deploy/SparkSubmit.scala
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
...
val appArgs = parseArguments(args)//在doSubmit中调用解析参数方法----------------->parseArguments, appArgs为被解析出来的封装的参数
...
appArgs.action match {//使用解析出来的参数appArgs判断参数信息的action字段
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)//---------------->submit
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
doSubmit/parseArguments
org/apache/spark/deploy/SparkSubmit.scala
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args)
}
doSubmit/parseArguments/SparkSubmitArguments
org/apache/spark/deploy/SparkSubmitArguments.scala
private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
extends SparkSubmitArgumentsParser with Logging {
var master: String = null
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
...//各种参数
/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
val defaultProperties = new HashMap[String, String]()
...
}
defaultProperties//使用HashMap存储配置参数
}
// Set parameters from command line arguments
parse(args.asJava)//解析参数---------------------------->
doSubmit/parseArguments/SparkSubmitArguments/parse
org\apache\spark\launcher\SparkSubmitOptionParser.class
==疑问==
protected final void parse(List<String> args) {//传入配置String
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
int idx = false;
int idx;
for(idx = 0; idx < args.size(); ++idx) {
String arg = (String)args.get(idx);
String value = null;
Matcher m = eqSeparatedOpt.matcher(arg);
...
value = (String)args.get(idx);}
if (!this.handle(name, value)) {break;}//---------------------->
} else {...}
}
doSubmit/parseArguments/SparkSubmitArguments/parse/handle
org/apache/spark/deploy/SparkSubmitArguments.scala
override protected def handle(opt: String, value: String): Boolean = {
opt match {
case NAME =>
name = value
case MASTER =>
master = value
...
case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
case NUM_EXECUTORS =>
numExecutors = value
case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
case EXECUTOR_CORES =>
executorCores = value
case EXECUTOR_MEMORY =>
executorMemory = value
case DRIVER_MEMORY =>
driverMemory = value
case DRIVER_CORES =>
driverCores = value
...
}//各种配置信息判断
action != SparkSubmitAction.PRINT_VERSION
}
doSubmit/submit
org/apache/spark/deploy/SparkSubmit.scala
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)//------------------------------->
}
})
} catch {...}
} else {
runMain(args, uninitLog)//--------------------------------->
}
}
doSubmit/submit/doRunMain/runMain
org/apache/spark/deploy/SparkSubmit.scala
/**
* Run the main method of the child class using the submit arguments.
* 启动main程序的启动子类
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
* 激活启动子类
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)//------------------------------>准备环境
...//日志相关
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)//使用反射将子启动类变为主启动类
} catch {...}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {//使用反射创建
new JavaMainApplication(mainClass)//创建YarnClusterApplication
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
app.start(childArgs.toArray, sparkConf)//启动YarnClusterApplication
//---------------------------------->
} catch {...}
}
doSubmit/submit/doRunMain/runMain/prepareSubmitEnvironment
org/apache/spark/deploy/SparkSubmit.scala
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
var childMainClass = ""
// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN//判断环境为yarn环境------------------------>
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
doSubmit/submit/doRunMain/runMain/start
org/apache/spark/deploy/SparkApplication.scala
private[spark] trait SparkApplication {
def start(args: Array[String], conf: SparkConf): Unit
}
启动CtrL+H查看继承树
Yarn源码文件结构
Client/start
org/apache/spark/deploy/yarn/Client.scala
org.apache.spark.deploy.yarn.YarnClusterApplication
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
new Client(new ClientArguments(args), conf, null).run()//--------------------------->
}//--------------->Client
//-------------->ClientArguments
}
Client/ClientArguments
org/apache/spark/deploy/yarn/ClientArguments.scala
private[spark] class ClientArguments(args: Array[String]) {
var userJar: String = null
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
parseArgs(args.toList)//----------------------->
private def parseArgs(inputArgs: List[String]): Unit = {//<---------------------
var args = inputArgs
while (!args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail
case ("--class") :: value :: tail =>
userClass = value
args = tail
....
}
}
...
}
YarnClient.createYarnClient
org/apache/spark/deploy/yarn/Client.scala/YarnClient.createYarnClient
private[spark] class Client(
val args: ClientArguments,
val sparkConf: SparkConf,
val rpcEnv: RpcEnv)
extends Logging {
import Client._
import YarnSparkHadoopUtil._
//创建YARN客户端
private val yarnClient = YarnClient.createYarnClient//----------------->YarnClient
....
}
2.ApplicationMaster任务
ApplicationMaster/main
org.apache.spark.deploy.yarn.ApplicationMaster