Spark_SubmitSource


1.Yarn Client提交源码

爱护生命,少肝源码!

1.任务的提交

==程序起点==

spark-submit.cmd

==脚本启动==

bin/spark-submit
--class WordCount
--master yarn
--deploy-mode cluster
./wordCount.jar
./input ./output

2.SparkSubmit

shell

spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10

java -p

"Java\jdk1.8.0_131\bin\java" -cp "C:\spark-3.0.0-bin-hadoop3.2\bin\..\conf\;
\spark-3.0.0-bin-hadoop3.2\bin\..\jars\*" 
-Xmx1g org.apache.spark.deploy.SparkSubmit 
--master local[2] 
--class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-3.0.0.jar 10

java -cp 等同于classpath

启动submit的main方法

  1. java -cp
  2. 开启JVM虚拟机
  3. 开启Process(SparkSubmit)
  4. 程序入口SparkSubmit.main

org.apache.spark.deploy.SparkSubmit,找到SparkSubmit的伴生对象,并找到main方法

main

org/apache/spark/deploy/SparkSubmit.scala

override def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit() {
      self =>
      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {new SparkSubmitArguments(args) {...}}//在主程序中new一个用于解析参数方法的的对象
        ...//日志方法
    submit.doSubmit(args)//调用doSubmit方法
  }

doSubmit

org/apache/spark/deploy/SparkSubmit.scala

def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    ...
    val appArgs = parseArguments(args)//在doSubmit中调用解析参数方法----------------->parseArguments, appArgs为被解析出来的封装的参数
   	...
    appArgs.action match {//使用解析出来的参数appArgs判断参数信息的action字段
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)//---------------->submit
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

doSubmit/parseArguments

org/apache/spark/deploy/SparkSubmit.scala

protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    new SparkSubmitArguments(args)
  }

doSubmit/parseArguments/SparkSubmitArguments

org/apache/spark/deploy/SparkSubmitArguments.scala

private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env)
  extends SparkSubmitArgumentsParser with Logging {
  var master: String = null
  var deployMode: String = null
  var executorMemory: String = null
  var executorCores: String = null
  ...//各种参数

  /** Default properties present in the currently defined defaults file. */
  lazy val defaultSparkProperties: HashMap[String, String] = {
    val defaultProperties = new HashMap[String, String]()
    ...
    }
    defaultProperties//使用HashMap存储配置参数
  }
// Set parameters from command line arguments
  parse(args.asJava)//解析参数---------------------------->

doSubmit/parseArguments/SparkSubmitArguments/parse

org\apache\spark\launcher\SparkSubmitOptionParser.class

==疑问==

protected final void parse(List<String> args) {//传入配置String
        Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
        int idx = false;
        int idx;
        for(idx = 0; idx < args.size(); ++idx) {
            String arg = (String)args.get(idx);
            String value = null;
            Matcher m = eqSeparatedOpt.matcher(arg);
            ...
                    value = (String)args.get(idx);}
                if (!this.handle(name, value)) {break;}//---------------------->
            } else {...}
        }

doSubmit/parseArguments/SparkSubmitArguments/parse/handle

org/apache/spark/deploy/SparkSubmitArguments.scala

override protected def handle(opt: String, value: String): Boolean = {
    opt match {
      case NAME =>
        name = value
      case MASTER =>
        master = value
     ...
      case DEPLOY_MODE =>
        if (value != "client" && value != "cluster") {
          error("--deploy-mode must be either \"client\" or \"cluster\"")
        }
        deployMode = value
      case NUM_EXECUTORS =>
        numExecutors = value
      case TOTAL_EXECUTOR_CORES =>
        totalExecutorCores = value
      case EXECUTOR_CORES =>
        executorCores = value
      case EXECUTOR_MEMORY =>
        executorMemory = value
      case DRIVER_MEMORY =>
        driverMemory = value
      case DRIVER_CORES =>
        driverCores = value
      ...
    }//各种配置信息判断
    action != SparkSubmitAction.PRINT_VERSION
  }

doSubmit/submit

org/apache/spark/deploy/SparkSubmit.scala

private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(args, uninitLog)//------------------------------->
            }
          })
        } catch {...}
      } else {
        runMain(args, uninitLog)//--------------------------------->
      }
    }

doSubmit/submit/doRunMain/runMain

org/apache/spark/deploy/SparkSubmit.scala

/**
   * Run the main method of the child class using the submit arguments.
   * 启动main程序的启动子类
   * This runs in two steps. First, we prepare the launch environment by setting up
   * the appropriate classpath, system properties, and application arguments for
   * running the child main class based on the cluster manager and the deploy mode.
   * Second, we use this launch environment to invoke the main method of the child
   * main class.
   * 激活启动子类
   * Note that this main class will not be the one provided by the user if we're
   * running cluster deploy mode or python applications.
   */
  private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)//------------------------------>准备环境
    ...//日志相关
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }
    var mainClass: Class[_] = null
    try {
      mainClass = Utils.classForName(childMainClass)//使用反射将子启动类变为主启动类
    } catch {...}

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {//使用反射创建
      new JavaMainApplication(mainClass)//创建YarnClusterApplication
    }
    @tailrec
    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }
    try {
      app.start(childArgs.toArray, sparkConf)//启动YarnClusterApplication
        //---------------------------------->
    } catch {...}
  }

doSubmit/submit/doRunMain/runMain/prepareSubmitEnvironment

org/apache/spark/deploy/SparkSubmit.scala

private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String], Seq[String], SparkConf, String) = {
    // Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sparkConf = args.toSparkConf()
    var childMainClass = ""

    // Set the cluster manager
    val clusterManager: Int = args.master match {
      case "yarn" => YARN//判断环境为yarn环境------------------------>
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("k8s") => KUBERNETES
      case m if m.startsWith("local") => LOCAL
      case _ =>
        error("Master must either be yarn or start with spark, mesos, k8s, or local")
        -1
    }

doSubmit/submit/doRunMain/runMain/start

org/apache/spark/deploy/SparkApplication.scala

private[spark] trait SparkApplication {
  def start(args: Array[String], conf: SparkConf): Unit
}

启动CtrL+H查看继承树

继承树

Yarn源码文件结构

Yarn

Client/start

org/apache/spark/deploy/yarn/Client.scala

org.apache.spark.deploy.yarn.YarnClusterApplication

private[spark] class YarnClusterApplication extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)

    new Client(new ClientArguments(args), conf, null).run()//--------------------------->
  }//--------------->Client
    //-------------->ClientArguments
}

Client/ClientArguments

org/apache/spark/deploy/yarn/ClientArguments.scala

private[spark] class ClientArguments(args: Array[String]) {
  var userJar: String = null
  var userClass: String = null
  var primaryPyFile: String = null
  var primaryRFile: String = null
  var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
  parseArgs(args.toList)//----------------------->
  private def parseArgs(inputArgs: List[String]): Unit = {//<---------------------
    var args = inputArgs
    while (!args.isEmpty) {
      args match {
        case ("--jar") :: value :: tail =>
          userJar = value
          args = tail
        case ("--class") :: value :: tail =>
          userClass = value
          args = tail
        ....
      }
    }
   ...
  }

YarnClient.createYarnClient

org/apache/spark/deploy/yarn/Client.scala/YarnClient.createYarnClient

private[spark] class Client(
    val args: ClientArguments,
    val sparkConf: SparkConf,
    val rpcEnv: RpcEnv)
  extends Logging {
  import Client._
  import YarnSparkHadoopUtil._
      //创建YARN客户端
  private val yarnClient = YarnClient.createYarnClient//----------------->YarnClient
      ....
  }

2.ApplicationMaster任务

ApplicationMaster/main

org.apache.spark.deploy.yarn.ApplicationMaster

2.Spark提交流程


文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录