Spark_累加器与分区器


0x1 Long累加器的使用

object selfAccumulator {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("accumulator"))
    val word: RDD[String] = sc.textFile("Day06/input/word.txt")
    val words = word.flatMap(_.split(" "))
    val accumulator: LongAccumulator = sc.longAccumulator//定义数字累加器
    words.foreach{
      case "hello" => accumulator.add(1)
      case _ =>
    }
    println("hello:"+accumulator.value)//注意输出为accumulator的值
  }
}

0x2 自定义分区器

1.HashPartitioner

object PartitionBy {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd1: RDD[(String, Int)] = sc.textFile("Day06/input/number").map((_, 1))
    //只有k-v值才有分区器
    rdd1.saveAsTextFile("./output")
    //使用hash分区器
    val rdd2: RDD[(String, Int)] = rdd1.partitionBy(new HashPartitioner(3))
    rdd2.saveAsTextFile("./output2")
  }
}

2.RangePartitioner

object PartitionBy {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd1= sc.textFile("Day06/input/number").map((_, 1))
    //只有k-v值才有分区器
    rdd1.saveAsTextFile("./output")
    //使用rangePartitioner
    val value = new RangePartitioner[String, Int](2, rdd1.sample(false, 0.5))
    //range分区器的使用,要定义泛型,传递分区,传递sample
    //首先要传递一个分区,传递一个
    rdd1.partitionBy(value)
    rdd1.saveAsTextFile("./output2")
  }
}

3.自定义分区

case class MyPartitioner(numPartition:Int) extends Partitioner {
    override def numPartitions: Int = numPartition
    override def getPartition(key: Any): Int = (math.random() * numPartition).toInt
}
object PartitionBy {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
    val sc: SparkContext = new SparkContext(sparkConf)
    val rdd1= sc.textFile("Day06/input/number").map((_, 1))
    //只有k-v值才有分区器
    rdd1.saveAsTextFile("./output")
    //使用rangePartitioner
    val value = new RangePartitioner[String, Int](2, rdd1.sample(false, 0.5))
    //range分区器的使用,要定义泛型,传递分区,传递sample
    //首先要传递一个分区,传递一个
    val value1: RDD[(String, Int)] = rdd1.partitionBy(MyPartitioner(2))
    value1.saveAsTextFile("./output2")
  }
}

0x3 自定义累加器

package test.longAccumulator

import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

/**
 * @create 2020-10-31 11:21
 */
object selfAccumulator {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("accumulator"))
    val word: RDD[String] = sc.textFile("Day06/input/word.txt")
    val words = word.flatMap(_.split(" "))
    val accumulator: LongAccumulator = sc.longAccumulator//定义数字累加器
    words.foreach{
      case "hello" => accumulator.add(1)
      case _ =>
    }
    println("hello:"+accumulator.value)//注意输出为accumulator的值
    println("--------------------------------------------------")
  }
}
//自定义累加器
//1.要继承的类为AccumulatorV2
//2.要注意描写泛型
//3.注意使用一种集合来累加
class WCAccumulator extends AccumulatorV2[String,mutable.Map[String,Int]] {
  private val map = mutable.Map.empty[String, Int]
  //如何理解Map.empty的创建map的方法//创建空集合

  //传递一个你决定为空的方法
  override def isZero: Boolean = map.isEmpty
  //传递一个复制的方法
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
    val accumulator:WCAccumulator = new WCAccumulator
    accumulator.map ++= this.map//如何理解this?
    accumulator//传递回
  }

  override def reset(): Unit = map.clear()

  override def add(v: String): Unit = map.update(v,map.getOrElse(v,0)+1)

  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit =
    other.value.foreach{//传入的是每个key的值得键值对
      case (word,count) =>
        map.update(word,map.getOrElse(word,0)+count)//累加器核心2
    }
  override def value: mutable.Map[String, Int] = map
}

文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录