Spark_SparkCore


0x1 spark概述

1.历史

在之前的学习中,Hadoop的MapReduce是大家广为熟知的计算框架,那为什么咱们还要学习新的计算框架Spark呢,这里就不得不提到Spark和Hadoop的关系。

首先从时间节点上来看:

Hadoop

  1. 2006年1月,Doug Cutting加入Yahoo,领导Hadoop的开发
  2. 2008年1月,Hadoop成为Apache顶级项目
  3. 2011年1.0正式发布
  4. 2012年3月稳定版发布
  5. 2013年10月发布2.X (Yarn)版本

Spark

  1. 2009年,Spark诞生于伯克利大学的AMPLab实验室
  2. 2010年,伯克利大学正式开源了Spark项目
  3. 2013年6月,Spark成为了Apache基金会下的项目
  4. 2014年2月,Spark以飞快的速度成为了Apache的顶级项目
  5. 2015年至今,Spark变得愈发火爆,大量的国内公司开始重点部署或者使用Spark

2.Spark核心模块

spark核心模块

Spark Core

Spark Core中提供了Spark最基础与最核心的功能,Spark其他的功能如:Spark SQL,Spark Streaming,GraphX, MLlib都是在Spark Core的基础上进行扩展的

Spark SQL

Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。

Spark Streaming

Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API。

Spark MLlib

MLlib是Spark提供的一个机器学习算法库。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语。

Spark GraphX

GraphX是Spark面向图计算提供的框架与算法库。

3.入门wordCount

object wordCountTest {
  def main(args: Array[String]): Unit = {
    //spark标准获取流程
    val sparkConf: SparkConf = new SparkConf().setAppName("wordCount").setMaster("local[1]")
    val sc = new SparkContext(sparkConf)
    //创建RDD,使用textFile的方式创建RDD
    val line: RDD[String] = sc.textFile("./word.txt", 1)
    val result: RDD[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    //打印
    result.collect().foreach(println)
    sc.close()
  }
}

4.spark运行模式

Spark运行环境 用法
Local模式 测试
Standalone模式 独立部署Matser-Worker
自身提供计算资源,降低了耦合性
Yarn模式 提供集群模式
K8S模式
Mesos模式 Twitter
Windows模式

5.spark端口号

Spark端口号 详解 Hadoop
内部通信:==7077== Spark Master内部通信服务端口号 8020/9000
Master资源监控:8080/改==8989== Standalone模式下,Spark Master Web端口号 9870
Spark-Shell监控:==4040== Spark查看当前Spark-shell运行任务情况端口号
Spark使用HDFS端口:==8020==
历史服务器UI端口:==18080== Spark历史服务器端口号 19888
Yarn:==8088== Hadoop YARN任务运行情况查看端口号 8088

6.Spark运行组件

Spark框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。

Spark运行组件

6.1Spark Executor

集群中运行在==工作节点(Worker)中的一个JVM进程==,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点Executor的内存大小和使用的虚拟CPU核(Core)数量。

名称 说明
–num-executors 配置Executor的数量
–executor-memory 配置每个Executor的内存大小
–executor-cores 配置每个Executor的虚拟CPU core数量
Executor如果是3核,设备是单核,模拟的多线程操作,其实是并发操作

6.2并行与并发

我们会给Executor分配虚拟的核心数量,如果核心不够会触发多线程操作,并发

如果核心够用,则进行并行操作,可以进行配置

6.3有向无环图

表示一种依赖关系,依赖关系形成的拓扑图形称为DAG,有向无环图

graph LR
A-->B
B-->C
B-->D
D--禁止-->A

如图,D向A会形成无环图,有环会形成死循环(与maven类似)

7.Yarn Cluster任务提交流程

核心:分两大块 1.资源的申请 2.计算的准备 任务发给资源

Client与Cluster区别在于Driver程序运行的节点位置

Spark-Yarn提交流程

  1. 任务提交
  2. 向ResourceManager通讯申请启动ApplicationMaster
  3. ApplicationMaster选择合适的节点借用NodeManager启动一个container
  4. 在container中运行AppMatser=Driver
  5. Driver启动后,向RM申请container运行Executor进程
  6. Executor进程启动后反向向Driver进行注册
  7. 全部注册完成后开始执行main函数
  8. 执行到action算子,触发一个job,根据是否发生shuffle开始划分stage
  9. 每个stage生成对应的TaskSet[task1,task2,task3…]
  10. 然后将task分发到各个Executor上执行
[shenneng@hadoop102 spark-yarn]$ bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

0x2 Spark框架

Spark和Hadoop的根本差异是多个作业之间的数据通信问题

Spark运行模式示意图

Spark运行环境 用法
Local模式 测试
Standalone模式 独立部署Matser-Worker
自身提供计算资源,降低了耦合性
Yarn模式 提供集群模式
K8S模式
Mesos模式 Twitter
Windows模式

1.Yarn模式

Yarn-Cluster提交流程

Client与Cluster的主要区别是Driver是否在本地运行

2.分布式计算模拟

分布式计算核心-拆分Task

通过简单的分布式计算模拟,理解任务的拆分,运行的模块,并行的原理,RDD的封装,底层数据结构

DRIVER

//DRIVER

package com.ecust.saprkcore

import java.io.{ObjectOutputStream, OutputStream}
import java.net.{ServerSocket, Socket}

/**
 * @author Jinxin Li
 * @create 2020-12-31 13:43
 */
object Driver {
  def main(args: Array[String]): Unit = {

    //进行逻辑的封装,计算的准备,数据的提交

    val client1 = new Socket("localhost", 9999)
    val client2 = new Socket("localhost", 8888)

    val out1: OutputStream = client1.getOutputStream
    val out2: OutputStream = client2.getOutputStream

    val objOut1 = new ObjectOutputStream(out1)
    val objOut2 = new ObjectOutputStream(out2)

    val  task = new Task()

    val subTask1 = new SubTask()
    subTask1.logic=task.logic
    subTask1.data=task.data.take(2)

    val subTask2 = new SubTask()
    subTask2.logic=task.logic
    subTask2.data=task.data.takeRight(2)

    objOut1.writeObject(subTask1)
    objOut1.flush()
    objOut1.close()

    objOut2.writeObject(subTask2)
    objOut2.flush()
    objOut2.close()

    //发送,注意在网络中传递的数据要进行序列化,不可能传递对象,必须序列化
    println("任务发送完毕")

    //关闭客户端
    client1.close()
    client2.close()
  }
}

EXECUTOR1

object Executor1 {
  def main(args: Array[String]): Unit = {

    //启动服务器,接受数据
    val server = new ServerSocket(9999)

    println("服务器9999启动,等待接受数据...")

    val client: Socket = server.accept()

    val in: InputStream = client.getInputStream

    val objIn = new ObjectInputStream(in)

    val task = objIn.readObject().asInstanceOf[SubTask]

    val ints: List[Int] = task.compute()

    println("接收到客户端9999接受的数据:"+ints)

    objIn.close()
    client.close()
    server.close()
  }
}

EXECUTOR2

object Executor2 {
  def main(args: Array[String]): Unit = {

    //启动服务器,接受数据
    val server = new ServerSocket(8888)

    println("服务器9999启动,等待接受数据...")

    val client: Socket = server.accept()

    val in: InputStream = client.getInputStream

    val objIn = new ObjectInputStream(in)

    val task= objIn.readObject().asInstanceOf[SubTask]

    val ints: List[Int] = task.compute()

    println("接收到客户端8888接受的数据:"+ints)

    objIn.close()
    client.close()
    server.close()
  }
}

SUBTASK

class SubTask extends Serializable {
  //这是一种特殊的数据结构,其中包含了数据的格式,数据的计算逻辑与算子转换
  //接收到数据之后,可以进行计算
  //RDD 广播变量 累加器 就是类似的数据结构
  var data :List[Int] = _
  var logic:Int=>Int = _

  //计算任务
  def  compute() ={
    data.map(logic)
  }
}

TASK

class Task extends Serializable {//实现序列化 特质
  //包含原数据的数据结构
  val data = List(1, 2, 3, 4)

  val function: Int => Int = (num: Int) => {
    num * 2
  }

  //注意函数的类型是Int=>Int
  val logic:Int=>Int = _*2

  //计算任务
  def  compute() ={
    data.map(logic)
  }
}

3.RDD与IO

字节流&字符流

InputStream in = new FileInputStream("path")
int i = -1
while(i = in.read()!=-1){
    println(i)
}
graph LR
File-->FileInputStream--read-->console

缓冲流

InputStream in = new BufferedInputStream(new FileInputStream("path"))
int i = -1
while(i = in.read()!=-1){
    println(i)
}
graph LR
File-->FileInputStream-->BufferedInputStream--read-->console

缓冲区的缓冲流

转换流InputStreamReader

Reader in = new BufferedReader(
    new InputStreamReader(
        new FileInputStream("path"),
        "UTF-8"
        )
    )
String s = null
while((s=in.readLine())!=null){
    println(i);
    )

装饰者设计模式的IO

可以看出核心是FileInputFormat,转换流与缓冲流都是包装,这种设计模式成为装饰者设计模式

哪些inputformat,都是对读取逻辑的封装,没有真正的读取数据

readLine才会真正的执行,new的过程仅仅是建立连接,但是没有真正的读取,有种延迟加载的感觉

RDD的组装方式,与IO的封装也是非常的类似

new HadoopRDD//textFile
new MapPartitionsRDD()//flatMap
new MapPartitionsRDD()//map
new ShuffleRDD()//reduceByKey

//执行
rdd.collect()

一层一层的包装

RDD的装饰者模式理解

RDD的数据处理方式类似于IO流,也有装饰者设计模式

RDD的数据只有在调用collect方法时,才会真正的执行

RDD是不保存数据的,但是IO可以临时保存一部分数据

4. 分区

RDD是一个最基本的数据处理模型

类似于Kafka中的分区,我们将数据进行分区,分区之后分成不成的Task,可以分发至Executor进行计算

RDD是最小的数据处理单元,里面包含了分区信息,提高并行计算的能力

数据的分区

0x3 spark核心

1. spark核心三大数据结构

RDD : 弹性分布式数据集

累加器:分布式共享只写变量

广播变量:分布式共享只读变量

2.RDD基本概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

1.弹性

存储的弹性:内存与磁盘的自动切换;

容错的弹性:数据丢失可以自动恢复;

计算的弹性:计算出错重试机制;

分片的弹性:可根据需要重新分片。

2.分布式:数据存储在大数据集群不同节点上

3.数据集:RDD封装了计算逻辑,并不保存数据

4.数据抽象:RDD是一个抽象类,需要子类具体实现

5.不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑

6.可分区、并行计算

3.RDD核心属性

RDD详细描述

SparkRDD的核心属性

图:Spark RDD核心属性

  1. 粗粒度操作(无法对单个元素进行操作)

  2. 内存中计算

  3. 懒执行

  4. 不变性

  5. 容错性

  6. 持久性(cache可以选择等级与checkpoint)

    --数据缓存
    wordToOneRdd.cache()
    --可以更改存储级别
    mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
    
    --设置检查点路径
    sc.setCheckpointDir("./checkpoint1")
    --数据检查点:针对wordToOneRdd做检查点计算
    wordToOneRdd.checkpoint()
    --一般两者联合使用
  7. 可分区(分区列表)

  8. 粘度分区(自定分区)

4.RDD缺点

  1. 没有内置的优化引擎,RDD无法利用Spark的高级优化器(包括catalyst optimizer与Tungsten执行引擎)的优势。开发人员需要根据其属性优化每个RDD
  2. 只能处理结构化数据与DataFrame和数据集不同,RDD不会推断所摄取数据的模式,而是需要用户指定它。
  3. 性能限制,作为内存中的JVM对象,RDD涉及垃圾收集和Java序列化的开销,这在数据增长时非常昂贵。
  4. 没有足够的内存来存储RDD时,它们会拖慢运行速度。也可以将RDD的该分区存储在不适合RAM的磁盘上。结果,它将提供与当前数据并行系统类似的性能。

5.RDD的来源

  1. 使用集合创建parallelize MakeRDD
  2. 外部存储文件创建RDD textfile
  3. 从其他RDD创建(血缘关系,cache,checkpoint)
  4. 直接创建RDD 内部使用

6.RDD的==分区分片问题==

RDD分区意味着一个分区一个job么,

RDD分区3意味着要在三个executor里执行么

重新分区,加入三个executor在不同的container里是如何发生shuffle里的,还是三个分区是一个job,这一个job在一个container里执行

7.RDD的序列化

从计算的角度, 算子以外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。

那么在scala的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给Executor端执行,就会发生错误

所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变

Kryo序列化框架

Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。

即使使用Kryo序列化,也要继承Serializable接口

8.RDD依赖与血缘

8.1概述

RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

相邻的两个RDD之间的关系称为依赖关系

多个连续的RDD的依赖关系,称之为血缘关系

我们的每一个RDD都会保存我们的血缘关系,会保存之前的血缘关系

RDD为了提供容错性,需要将RDD间的关系保存下来,一旦出现错误,可以根据血缘关系将数据源重新计算

val rdd1 = rdd.map(_.2)
//新的RDD依赖于旧的RDD
graph LR
RDD1--依赖-->RDD2--依赖-->RDD3--依赖-->RDD4
RDD4--flatmap-->RDD3
RDD3--map-->RDD2
RDD2--reduceByKey-->RDD1

8.2血缘关系的查看

//血缘关系的演示
//每个RDD记录了以前所有的血缘关系
package com.test
import org.apache.spark.api.java.JavaSparkContext.fromSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author Jinxin Li
 * @create 2020-10-26 10:04
 */
object wordCount {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setAppName("MyWordCount").setMaster("local[*]")
    val sc = new SparkContext(config)
    val lines: RDD[String] = sc.textFile("./word.txt")
    println("++++++++++++++++++++++++++++++++++=")
    println(lines.toDebugString)//打印血缘关系
    val words: RDD[String] = lines.flatMap(_.split(" "))
    println("++++++++++++++++++++++++++++++++++=")
    println(words.toDebugString)
    val pairs: RDD[(String, Int)] = words.map((_, 1))
    println("++++++++++++++++++++++++++++++++++=")
    println(pairs.toDebugString)
    val word: RDD[(String, Int)] = pairs.reduceByKey(_ + _)
    println("++++++++++++++++++++++++++++++++++=")
    println(word.toDebugString)
    println("++++++++++++++++++++++++++++++++++=")
    word.collect().foreach(println(_))
    sc.close;
  }
}
++++++++++++++++++++++++++++++++++=
(2) ./word.txt MapPartitionsRDD[1] at textFile at wordCount.scala:15 []
 |  ./word.txt HadoopRDD[0] at textFile at wordCount.scala:15 []
++++++++++++++++++++++++++++++++++=
(2) MapPartitionsRDD[2] at flatMap at wordCount.scala:18 []
 |  ./word.txt MapPartitionsRDD[1] at textFile at wordCount.scala:15 []
 |  ./word.txt HadoopRDD[0] at textFile at wordCount.scala:15 []
++++++++++++++++++++++++++++++++++=
(2) MapPartitionsRDD[3] at map at wordCount.scala:21 []
 |  MapPartitionsRDD[2] at flatMap at wordCount.scala:18 []
 |  ./word.txt MapPartitionsRDD[1] at textFile at wordCount.scala:15 []
 |  ./word.txt HadoopRDD[0] at textFile at wordCount.scala:15 []
++++++++++++++++++++++++++++++++++=
(2) ShuffledRDD[4] at reduceByKey at wordCount.scala:24 []
    //这个地方断开,表示shuffle +-
 +-(2) MapPartitionsRDD[3] at map at wordCount.scala:21 []
    |  MapPartitionsRDD[2] at flatMap at wordCount.scala:18 []
    |  ./word.txt MapPartitionsRDD[1] at textFile at wordCount.scala:15 []
    |  ./word.txt HadoopRDD[0] at textFile at wordCount.scala:15 []
++++++++++++++++++++++++++++++++++=

可以看出每个RDD会存储所有的血缘关系

同时使用dependices可以查看依赖关系

object wordCount {
  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setAppName("MyWordCount").setMaster("local[*]")
    val sc = new SparkContext(config)
    val lines: RDD[String] = sc.textFile("./word.txt")
    println("++++++++++++++++++++++++++++++++++=")
    println(lines.dependencies)
    val words: RDD[String] = lines.flatMap(_.split(" "))
    println("++++++++++++++++++++++++++++++++++=")
    println(words.dependencies)
    val pairs: RDD[(String, Int)] = words.map((_, 1))
    println("++++++++++++++++++++++++++++++++++=")
    println(pairs.dependencies)
    val word: RDD[(String, Int)] = pairs.reduceByKey(_ + _)
    println("++++++++++++++++++++++++++++++++++=")
    println(word.dependencies)
    println("++++++++++++++++++++++++++++++++++=")
    word.collect().foreach(println(_))
    sc.close;
  }
}
++++++++++++++++++++++++++++++++++=
List(org.apache.spark.OneToOneDependency@1a2bcd56)
++++++++++++++++++++++++++++++++++=
List(org.apache.spark.OneToOneDependency@3c3a0032)
++++++++++++++++++++++++++++++++++=
List(org.apache.spark.OneToOneDependency@5e519ad3)
++++++++++++++++++++++++++++++++++=
List(org.apache.spark.ShuffleDependency@765d55d5)
++++++++++++++++++++++++++++++++++=

可以看出存在两种依赖关系,一种OneToOneDependency与ShuffleDependency

新的RDD的一个分区的数据依赖于旧的RDD的一个分区的数据,这种依赖称之为OneToOne依赖

新的RDD的一个分区的数据依赖于旧的RDD的多个分区的数据,这种依赖称为Shuffle依赖(数据被打乱重新组合)

源码中的依赖关系

RDD依赖关系的继承关系

宽依赖的图

8.3阶段划分与源码

Shuffle划分阶段

如果是oneToOne不需要划分阶段

不同的阶段要保证Task执行完毕才能执行下一个阶段

阶段的数量等于shuffle依赖的数量+1

collect
dagScheduler.runjob
val waiter = submitJob//DAGScheduler-681
//让下翻
override def run(): Unit = eventProcessLoop.post(JobSubmitted)//DAGScheduler-714

private[scheduler] def handleJobSubmitted//DAGScheduler-975
{
    var finalStage: ResultStage = null//判定finalStage是否存在 985
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)//如果不存在则创建一个空的ResultStage 986
}
//也就是说ResultStage只有一个
private def createResultStage:ResultStage = {
    //445
    val parents = getOrCreateParentStages(rdd, jobId)//有没有上一个阶段,这个rdd是当前的reduceBykey最后的rdd
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stage
  }

/**获得父阶段列表*/
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {//466
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)//一个shuffle就会转换为一个阶段
    }.toList
  }

private[scheduler] def getShuffleDependencies(//508
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += rdd//放入当前rdd reduceByKey的rdd
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.remove(0)
      if (!visited(toVisit)) {//判断之前是否访问过
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep//模式匹配判断是否是shuffle依赖
          case dependency =>
            waitingForVisit.prepend(dependency.rdd)
        }
      }
    }
    parents
  }

private def getOrCreateShuffleMapStage( //338
    ... ShuffleMapStage = {...
        createShuffleMapStage(shuffleDep, firstJobId)
                          }
    
 def createShuffleMapStage[K, V, C](//384
      ... ShuffleMapStage = {
    val rdd = shuffleDep.rdd
          ...
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage()
      ...
    stage
  }

8.4RDD的任务划分

行动算子底层是runJob

Application:初始化一个SparkContext即生成一个Application

Job:一个Action算子就会生成一个Job

Stage:Stage等于宽依赖(ShuffleDependency)+1

Task:一个Stage阶段中,最后一个RDD分区个数就是Task的个数

注意:Application->Job->Stage->Task每一层都是一对n的关系

提交过程是一个阶段一个阶段的提交

private def submitStage(stage: Stage): Unit = {//1084
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug(s"submitStage($stage (name=${stage.name};" +
        s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)//有没有上一级阶段
        logDebug("missing: " + missing)
        if (missing.isEmpty) {//如果没有上一级的stage,则为空
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)//为空就提交stage/tasks
        } else {
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }


val tasks: Seq[Task[_]] = try {//1217
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {//匹配的阶段类型
        case stage: ShuffleMapStage =>//shuffleMaptask
          //new 几个跟map相关,ShuffleMapStage
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>//计算分区
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>//这里面有多少元素
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)//到底有多个new
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
}

//计算分区
// Figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions(){}
    
//ResultStage 61
override def findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
}//此处的job.numPartitions就是最后一个RDD的分区
//三个分区就是0-3
//一个RDD的三个分区,从并行角度就会分配为3个Task

//SuffleMapStage 91
override def findMissingPartitions(): Seq[Int] = {
    mapOutputTrackerMaster
      .findMissingPartitions(shuffleDep.shuffleId)
      .getOrElse(0 until numPartitions)
  }
}

一个应用程序会对应多个job(一个行动算子算是一个job)

ShuffleMapStage=>ShuffleMapTask

ResultStage=>ResultTask

9.RDD分区器

  1. Hash分区(默认)
  2. Range分区
  3. 自定义分区

只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余

Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

1.自定义分区器

a>HashPartitioner

object PartitionBy {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd1: RDD[(String, Int)] = sc.textFile("Day06/input/number").map((_, 1))
    //只有k-v值才有分区器
    rdd1.saveAsTextFile("./output")
    //使用hash分区器
    val rdd2: RDD[(String, Int)] = rdd1.partitionBy(new HashPartitioner(3))
    rdd2.saveAsTextFile("./output2")
  }
}

b>RangePartitioner

object PartitionBy {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd1= sc.textFile("Day06/input/number").map((_, 1))
    //只有k-v值才有分区器
    rdd1.saveAsTextFile("./output")
    //使用rangePartitioner
    val value = new RangePartitioner[String, Int](2, rdd1.sample(false, 0.5))
    //range分区器的使用,要定义泛型,传递分区,传递sample
    //首先要传递一个分区,传递一个
    rdd1.partitionBy(value)
    rdd1.saveAsTextFile("./output2")
  }
}

c>自定义分区

//自定义分区器
case class MyPartitioner(numPartition:Int) extends Partitioner {
    override def numPartitions: Int = numPartition
    override def getPartition(key: Any): Int = (math.random() * numPartition).toInt
}
//自定义分区器的使用
object PartitionBy {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd1= sc.textFile("Day06/input/number").map((_, 1))
    //只有k-v值才有分区器
    rdd1.saveAsTextFile("./output")
    //使用rangePartitioner
    val value = new RangePartitioner[String, Int](2, rdd1.sample(false, 0.5))
    //range分区器的使用,要定义泛型,传递分区,传递sample
    //首先要传递一个分区,传递一个
    val value1: RDD[(String, Int)] = rdd1.partitionBy(MyPartitioner(2))
    value1.saveAsTextFile("./output2")
  }
}

10.RDD累加器

系统累加器

object MyAccumulator {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("myAccumulator"))
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))
    //声明系统累加器
    val sum: LongAccumulator = sc.longAccumulator("sum")
    rdd.foreach(
      num=>{
        sum.add(num)
      }
    )
    //获取累加器
    println(sum.value)
  }
}

自定义累加器

1.自定wordcount累加器

//自定义累加器实现wordcount
class DefineAccumulator extends AccumulatorV2[String,mutable.Map[String,Int]]{
  val map: mutable.Map[String, Int] = mutable.Map()
  //判断累加器是否为初始状态
  override def isZero: Boolean = {map.isEmpty}
  //复制累加器
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {new DefineAccumulator()}
  //重置累加器
  override def reset(): Unit = map.clear()
  //区内相加
  override def add(v: String): Unit = {
    //区内相加的定义,如果存在元素,就对key值+1,如果不存在,就添加当前元素,key+1
    map(v)=map.getOrElse(v,0)+1
  }
  //区间相加
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    //区间相加,固定
    val map1: mutable.Map[String, Int] = this.value
    val map2: mutable.Map[String, Int] = other.value
    map2.foreach{
      case (k,v) => map1(k)=map1.getOrElse(k,0)+v
    }
  }
  override def value: mutable.Map[String, Int] = map
}

2.注册并使用定义累加器

object MyAccumulator2 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("accumulator"))
    val word: RDD[String] = sc.textFile("./word.txt")
    val words = word.flatMap(_.split(" "))
    //new出累加器
    val uacc = new DefineAccumulator
    //注册累加器
    sc.register(uacc)
    //使用累加器
    words.foreach(uacc.add(_))
    println(uacc.value)//注意输出为accumulator的值
  }
}

11.广播变量

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

在整个队列中,仅仅存在一次

object BoardCast {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("emm").setMaster("local[*]"))
    val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
    val list = List(("a",4), ("b", 5), ("c", 6), ("d", 7))
    val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
    val value = rdd1.map {
      case (key, num) => {
        var num1 = 0
        for ((k, v) <- broadcast.value) {
          if (k == key){
            num1=v
          }
        }
        (key, num+num1)
      }
    }
    value.collect().foreach(println)
  }
}

12.RDD的持久化

12.1为什么要使用RDD的持久化

数据不存储在RDD中

RDD的重用

如果一个RDD需要重复使用,需要从头再次执行来获取数据

RDD的对象可以重用,但是数据没法重用

object SparkCore02_RDD_Persist {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCountPersist").setMaster("local[1]")
    val sc = new SparkContext(sparkConf)

    val list = List("Hello Spark", "Hello Scala")

    //生成RDD RDD中不存储数据
    val listRDD: RDD[String] = sc.makeRDD(list, 1)
    val wordRDD: RDD[String] = listRDD.flatMap(word => word.split(" "))
    val tupleRDD: RDD[(String, Int)] = wordRDD.map(word =>
    { println("map阶段")
      (word, 1)
    })

    //分组的操作
    val groupRDD: RDD[(String, Iterable[Int])] = tupleRDD.groupByKey()
    val resultRDD: RDD[(String, Int)] = tupleRDD.reduceByKey(_ + _)

    resultRDD.collect().foreach(println)
    println("------------华丽分割线----------------")
    groupRDD.collect().foreach(println)
    sc.stop()
  }
}
//结果
map阶段
map阶段
map阶段
map阶段
(Spark,1)
(Hello,2)
(Scala,1)
------------华丽分割线----------------
map阶段
map阶段
map阶段
map阶段
(Spark,CompactBuffer(1))
(Hello,CompactBuffer(1, 1))
(Scala,CompactBuffer(1))

发现map阶段运行了两波,所有的执行都会从头开始计算

这样的执行影响了效率

要想解决这个问题,数据持久化提高效率

RDD持久化的作用

//进行缓存
tupleRDD.cache() //本质是persist

//tupleRDD.cache()
tupleRDD.persist(StorageLevel.MEMORY_AND_DISK)
//memory_only当内存不够的情况下,数据不能溢写到磁盘,会丢失数据
//memory_and_disk当内存不够的情况下,会将数据落到磁盘
//持久化操作,必须在行动算子执行时,完成的
sc.setCheckpointDir("./checkPoint")
//一般要保存到分布式存储中
tupleRDD.checkpoint()
//检查点路径,在作业执行完毕之后也是不会删除的

RDD对象的持久化操作不一定为了重用,在数据执行较长,或者数据比较重要的场合也可以进行持久化操作

12.2 三种持久化方法

  1. cache:将数据临时存储在内存中进行数据重用,会添加新的依赖,出现问题从头开始计算

  2. persist:将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘IO,性能较低,但是数据安全,如果作业执行完毕,临时保存在数据文件就会丢失

  3. checkpoint:将磁盘长久地保存在磁盘文件中进行数据重用,涉及到磁盘IO时,性能较低,但是会切断血缘关系,相当于改变数据源

    但是数据安全,为了保证数据安全,所以一般情况下,会独立的执行作业,为了能够提高效率,一般情况下,会跟cache联合使用,先cache在使用checkpoint这个时候会保存cache的文件,而不会独立的跑一个单独的任务

graph LR
sc-->map-reduceByKey-->cache--保存-->CheckPoint
cache-->collect

大区别

cache会添加新的依赖

package com.ecust.rdd.persist

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author Jinxin Li
 * @create 2020-12-24 14:03
 */
object SparkCore03_RDD_CheckPoint {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCountPersist").setMaster("local[1]")
    val sc = new SparkContext(sparkConf)

    val list = List("Hello Spark", "Hello Scala")

    val listRDD: RDD[String] = sc.makeRDD(list, 1)
    val wordRDD: RDD[String] = listRDD.flatMap(word => word.split(" "))
    val tupleRDD: RDD[(String, Int)] = wordRDD.map(word => (word, 1))
    tupleRDD.cache()
    println(tupleRDD.toDebugString)//运行之前看血缘关系

//    tupleRDD.persist(StorageLevel.MEMORY_AND_DISK)

//    sc.setCheckpointDir("./checkPoint")
//    tupleRDD.checkpoint()

    val groupRDD: RDD[(String, Iterable[Int])] = tupleRDD.groupByKey()
    groupRDD.collect().foreach(println)
    println("----------------------------")
    println(tupleRDD.toDebugString)//运行之后看血缘
    sc.stop()
  }
}
(1) MapPartitionsRDD[2] at map at SparkCore03_RDD_CheckPoint.scala:21 [Memory Deserialized 1x Replicated]
 |  MapPartitionsRDD[1] at flatMap at SparkCore03_RDD_CheckPoint.scala:20 [Memory Deserialized 1x Replicated]
 |  ParallelCollectionRDD[0] at makeRDD at SparkCore03_RDD_CheckPoint.scala:19 [Memory Deserialized 1x Replicated]
(Spark,CompactBuffer(1))
(Hello,CompactBuffer(1, 1))
(Scala,CompactBuffer(1))
----------------------------
(1) MapPartitionsRDD[2] at map at SparkCore03_RDD_CheckPoint.scala:21 [Memory Deserialized 1x Replicated]
 |       CachedPartitions: 1; MemorySize: 368.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B//这里添加了新的依赖
 |  MapPartitionsRDD[1] at flatMap at SparkCore03_RDD_CheckPoint.scala:20 [Memory Deserialized 1x Replicated]
 |  ParallelCollectionRDD[0] at makeRDD at SparkCore03_RDD_CheckPoint.scala:19 [Memory Deserialized 1x Replicated]

因此cache会在血缘关系中添加新的依赖,一旦出现问题,可以重头读取数据

//使用checkPoint之后
(1) MapPartitionsRDD[2] at map at SparkCore03_RDD_CheckPoint.scala:21 []
 |  MapPartitionsRDD[1] at flatMap at SparkCore03_RDD_CheckPoint.scala:20 []
 |  ParallelCollectionRDD[0] at makeRDD at SparkCore03_RDD_CheckPoint.scala:19 []
(Spark,CompactBuffer(1))
(Hello,CompactBuffer(1, 1))
(Scala,CompactBuffer(1))
----------------------------
(1) MapPartitionsRDD[2] at map at SparkCore03_RDD_CheckPoint.scala:21 []
 |  ReliableCheckpointRDD[4] at collect at SparkCore03_RDD_CheckPoint.scala:31 []

使用checkPoint会切断血缘关系,重新建立新的血缘关系等同于改变数据源

12.3源码解析CheckPoint单独执行任务

//SparkContext.scala 2093-2095
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()//-->
//runJob之后调用了doCheckPoint()方法

//-----------------------------
//RDD.scala 1789-1805 doCheckPoint
if (checkpointData.isDefined) {
    if (checkpointAllMarkedAncestors) {
        dependencies.foreach(_.rdd.doCheckpoint())
          }
    checkpointData.get.checkpoint()//如果需要checkPoint然后进行checkPoint
} else {
    dependencies.foreach(_.rdd.doCheckpoint())
}

//----------------------------------\
//org.apache.spark.rdd.LocalRDDCheckpointData 53-54
if (missingPartitionIndices.nonEmpty) {
      rdd.sparkContext.runJob(rdd, action, missingPartitionIndices)
}//单独执行任务

12.4使用CheckPoint恢复计算

checkpoint会将结果写到hdfs上,当driver 关闭后数据不会被清除。所以可以在其他driver上重复利用该checkpoint的数据。

checkpoint write data:

sc.setCheckpointDir("data/checkpoint")
val rddt = sc.parallelize(Array((1,2),(3,4),(5,6)),2)
rddt.checkpoint()
rddt.count() //要action才能触发checkpoint

read from checkpoint data:

package org.apache.spark

import org.apache.spark.rdd.RDD

object RDDUtilsInSpark {
  def getCheckpointRDD[T](sc:SparkContext, path:String) = {
  	//path要到part-000000的父目录
    val result : RDD[Any] = sc.checkpointFile(path)
    result.asInstanceOf[T]
  }
}

note:因为sc.checkpointFile(path)是private[spark]的,所以该类要写在自己工程里新建的package org.apache.spark中

example:

val rdd : RDD[(Int, Int)]= RDDUtilsInSpark.getCheckpointRDD(sc, "data/checkpoint/963afe46-eb23-430f-8eae-8a6c5a1e41ba/rdd-0")
   println(rdd.count())
   rdd.collect().foreach(println)

这样就可以原样复原了。

Demo

object SparkCore05_RDD_CheckPointUse {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCountPersist").setMaster("local[1]")
    val sc = new SparkContext(sparkConf)
	//使用工具类,注意工具类的包,要自己建立,注意泛型
    val rdd: RDD[(String, Int)] = RDDUtilsInSpark.getCheckpointRDD[RDD[(String, Int)]](sc, "./checkPoint/1186c961-ddb4-4af5-b7dc-6cc99776490b/rdd-2")
      //之前的map之后reduceBykey之前的checkPoint文件

    val result: RDD[(String, Int)] = rdd.reduceByKey(_ + _)

    result.collect().foreach(println)
    sc.stop()
  }
}
import org.apache.spark.rdd.RDD

//可以恢复checkPoint的工具类,注意放置的包
object RDDUtilsInSpark {
  def getCheckpointRDD[T](sc: SparkContext, path: String) = {
    //path要到part-000000的父目录
    val result: RDD[Any] = sc.checkpointFile(path)
    result.asInstanceOf[T]
  }
}

13.RDD自定义分区器

Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

1)注意:

(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

package com.ecust.rdd.partition

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

/**
 * @author Jinxin Li
 * @create 2020-12-26 10:52
 * 自定义分区规则
 */
object SparkCore01_RDD_Partitioner {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("Partitioner")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[(String, String)] = sc.makeRDD(List(
      ("nba", "xxxxxxxxxxxxxxx"),
      ("wba", "aaaaaaaaaaaaaa"),
      ("cba", "dddddddddddd"),
      ("wcba", "ppppppppppppppppppppppp")
    ), 3)

    /*
    自动义分区器,决定数据去哪个分区
     */
    val rddPar: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner())

    rddPar.saveAsTextFile("./par")

    sc.stop()
  }

  class MyPartitioner extends Partitioner{
    //分区数量
    override def numPartitions: Int = 3

    //返回Int类型,返回数据的分区索引 从零开始
    //Key表示数据的KV到底是什么
    //根据数据的key值返回数据所在分区索引
    override def getPartition(key: Any): Int = {
      key match {
        case "nba" => 0
        case "cba" => 1
        case _ => 2
      }

      /*if (key == "nba"){
        0
      } else if(key == "cba"){
        1
      }else{
        2
      }*/
    }
  }

}

14.RDD的存储与保存

Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS以及数据库。

//集群文件系统存储示例
hdfs://hadoop102:8020/input/1.txt

15.RDD的累加器

如果没有累加器,我们计算时只能使用reduce,要想把executor的变量拉回到Driver困难

引入问题

//学前测试

object SparkCore02_RDD_accumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("Partitioner")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),1)

    var sum:Int = 0
    //行动算子返回非RDD
    rdd.foreach(num=>{
      sum+=num
      println("executor:"+sum)
    })

    println("driver:"+sum)//打印结果为零,Driver->executor,结果返回不了
    sc.stop()

  }
}

累加器的主要目的

累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

Long累加器Demo

object SparkCore03_RDD_accumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("Partitioner")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),1)

    //todo 自定义累加器
    val sum: LongAccumulator = sc.longAccumulator("sum")
    //系统自带了一些累加器
//    sc.doubleAccumulator
//    sc.collectionAccumulator()

    rdd.foreach(num=>sum.add(num))

    println("driver:"+sum.value)
    sc.stop()
  }
}

特殊情况

少加:转换算子中调用累加器,如果没有行动算子的话,name不会执行

多加:转换算子中调用累加器,多次行动算子会调用多次,一般会放在行动算子中进行操作

object SparkCore04_RDD_accumulator {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("Partitioner")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),1)

    //todo 自定义累加器
    val sum: LongAccumulator = sc.longAccumulator("sum")
    //系统自带了一些累加器
//    sc.doubleAccumulator
//    sc.collectionAccumulator()

    val result: RDD[Unit] = rdd.map(num => sum.add(num))
    
    result.collect()
    result.collect()//两个行动算子会多加

    println("driver:"+sum.value)
    sc.stop()
  }
}

16.RDD的自定义广播变量

分布式共享只写变量

表示累加器的值互相之间是没法访问的,自己能读自己,只有Driver进行读到,然后在Driver端进行合并

我们可以将一些Shuffle的东西使用累加器来实现(==优化==)

比方:需要shuffle的方法就不要shuffle了

闭包数据,都是以Task为单位发送的,每个人物中包含的闭包数据这样可能会导致,一个Executor中含有大量的重复的数据,并且占用大量的内存

Executor本质其实就是JVM,所以在启动时,会自动分配内存

完全可以将任务中的闭包数据放置到Executor的内存中,达到共享的目的

Spark中的广播变量可以将闭包的数据保存在Executor的内存中

分布式共享只读变量

graph TD
map-->Executor/task1
map-->Executor/task2
map-->Executor/task3
object SparkCore04_RDD_BroadCast {
  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("acc")
    val sc = new SparkContext(sparkConf)

    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))

    val map: mutable.Map[String, Int] = mutable.Map(("a", 4), ("b", 5), ("c", 6))

    //定义广播变量
    val value: Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)

    //每个task都有一份数据
    val result: RDD[(String, (Int, Int))] = rdd1.map { case (w, c) => {
      val i: Int = value.value.getOrElse(w, 0)
      (w, (c, i))
    }}
    result.collect().foreach(println)
    sc.stop()
  }
}

文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录