0x1 Long累加器的使用
object selfAccumulator {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("accumulator"))
val word: RDD[String] = sc.textFile("Day06/input/word.txt")
val words = word.flatMap(_.split(" "))
val accumulator: LongAccumulator = sc.longAccumulator
words.foreach{
case "hello" => accumulator.add(1)
case _ =>
}
println("hello:"+accumulator.value)
}
}
0x2 自定义分区器
1.HashPartitioner
object PartitionBy {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd1: RDD[(String, Int)] = sc.textFile("Day06/input/number").map((_, 1))
rdd1.saveAsTextFile("./output")
val rdd2: RDD[(String, Int)] = rdd1.partitionBy(new HashPartitioner(3))
rdd2.saveAsTextFile("./output2")
}
}
2.RangePartitioner
object PartitionBy {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd1= sc.textFile("Day06/input/number").map((_, 1))
rdd1.saveAsTextFile("./output")
val value = new RangePartitioner[String, Int](2, rdd1.sample(false, 0.5))
rdd1.partitionBy(value)
rdd1.saveAsTextFile("./output2")
}
}
3.自定义分区
case class MyPartitioner(numPartition:Int) extends Partitioner {
override def numPartitions: Int = numPartition
override def getPartition(key: Any): Int = (math.random() * numPartition).toInt
}
object PartitionBy {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Main")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd1= sc.textFile("Day06/input/number").map((_, 1))
rdd1.saveAsTextFile("./output")
val value = new RangePartitioner[String, Int](2, rdd1.sample(false, 0.5))
val value1: RDD[(String, Int)] = rdd1.partitionBy(MyPartitioner(2))
value1.saveAsTextFile("./output2")
}
}
0x3 自定义累加器
package test.longAccumulator
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object selfAccumulator {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("accumulator"))
val word: RDD[String] = sc.textFile("Day06/input/word.txt")
val words = word.flatMap(_.split(" "))
val accumulator: LongAccumulator = sc.longAccumulator
words.foreach{
case "hello" => accumulator.add(1)
case _ =>
}
println("hello:"+accumulator.value)
println("--------------------------------------------------")
}
}
class WCAccumulator extends AccumulatorV2[String,mutable.Map[String,Int]] {
private val map = mutable.Map.empty[String, Int]
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
val accumulator:WCAccumulator = new WCAccumulator
accumulator.map ++= this.map
accumulator
}
override def reset(): Unit = map.clear()
override def add(v: String): Unit = map.update(v,map.getOrElse(v,0)+1)
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit =
other.value.foreach{
case (word,count) =>
map.update(word,map.getOrElse(word,0)+count)
}
override def value: mutable.Map[String, Int] = map
}