Spark_SparkOperator


算子

reduceBykey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

aggregateByKey

val dataRDD1 =
    sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 =
    dataRDD1.aggregateByKey(0)(_+_,_+_)

combineByKey

 * @author Jinxin Li
 * @create 2020-10-30 8:53
 * 第一个初始值(但是传递函数),把每个分区的第一个值通过一个函数转化为初始值
 * 第二个是区内聚合
 * 第三个是区间聚合函数.跟上一个一样
 *
 * 没有经过shuffle数据不会改变排序
 * 需求,区内字符串相加,区间字符串长度相乘
 */
object CombineByKey {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[3]").setAppName("project1"))
    val list= List(
      (1, "aa"), (1,"b"), (1,"a"),
      (2,"ccc"), (2, "95"), (1,"b"))//这里(1,"b")是第二个分区,但是位置是3,但是key为1的时候他是第一个,用这个第一个数指
    val input = sc.makeRDD(list, 2)

    val combineRdd= input.combineByKey(
      x=>x.length,//将每一个分区内的第一位数的进行函数作为初始值
      //这里区内第一个数指的是,相同的key的值的第一个
      (len:Int,str:String)=>len + str.length,//len是首值,也就是第一个函数传递的分区
      (len1:Int,len2:Int) => len1*len2//不是函数的柯里化无法使用首值进行类型推断
    ).saveAsTextFile("Day05/output")
  }
}

sortByKey

稳定排序:数字相同不会改变顺序

函数时间复杂度O(c),O(n),O(nlogn),O(n2)

object SortByKey {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("project1"))
    val list= List(
      (1, "aa"), (1,"b"), (1,"a"),
      (2,"ccc"), (2, "95"), (1,"b"))
    val input = sc.makeRDD(list, 2)
    input.sortByKey().collect().foreach(println)
  }
}

join

object Join {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("project1"))
    val list= List(
      (1, "aa"), (1,"b"), (1,"a"),
      (2,"ccc"), (2, "95"), (1,"b"))//这里(1,"b")是第二个分区,但是位置是3,但是key为1的时候他是第一个,用这个第一个数指
    val input1 = sc.makeRDD(list, 2)

    val list2= List(
      (1, "aa"), (1,"b"), (1,"a"),
      (2,"ccc"), (2, "95"), (3,"b"))
    val input2 = sc.makeRDD(list, 2)
    input1.join(input2).collect().foreach(println)
    //结果是相同的Key进行join 1号list有3个key1,2号有4个key1,两者会产生12个数据(key1)
    //同时这个是内连接
    //也就是3这个key会消失
  }
}

PartitionBy(包含关于分区的一些说明)

object PartitionBy {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("project1"))
    val rdd = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(1,"aaa"),(2,"bbb"),(3,"ccc")),3)
    /*import org.apache.spark.HashPartitioner
    val rdd2 = rdd.partitionBy(new HashPartitioner(3))*/
    //关于makeRDD的分区事项,默认是根据range与核心数进行切分,分区,添加hash分区器之后按照hash进行分区


    val fileRDD: RDD[String] = sc.textFile("Day05/input")
    fileRDD.saveAsTextFile("Day05/output")
    //textFile的分区默认按照2个分区.保证最少两个分区
    //默认按照HDFS来进行分区,但是至少两个分区
  }
}

count

//行动算子的主要特点就是返回一个非RDD,经过shuffle
object count {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("project1"))
    val rdd= sc.textFile("Day02/input/word.txt")
    // 返回RDD中元素的个数
    val countResult: Long = rdd.count()
    println(countResult)
  }
}

cogroup

object cogroup {
  //coGroup主要是对每一个key进行分组,然后针对于这些分组每一个分组作为一个buffer
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("project1"))
    val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("c",3)))
    val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))

    val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
    value.foreach(println)
    //(a,(CompactBuffer(1, 2),CompactBuffer(1)))
    //(c,(CompactBuffer(3),CompactBuffer(2, 3)))

  }
}

1604105832445

sc.longAccumulator

累加器

Wc的累加器

Scala算子总结

1604113095891

1604113150988

1604129808171

1604130528847

1604133613281

1604279277784

1604279808204

算子实战1.1

//6.2.1
object Case1_1 {
  def main(args: Array[String]): Unit = {
    val Conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("project")
    val sc: SparkContext = new SparkContext(Conf)
    val source: RDD[Array[String]] = sc.textFile("Day07/input/user_visit_action.txt").map(_.split("_"))
    //品类点击总数
    //思路:filter->map(6,1)->reduceBykey
    val require1RDD: RDD[(String, Int)] = source.filter(fields => fields(6) != "-1").map(fields => (fields(6), 1)).reduceByKey(_ + _)
    require1RDD.collect().foreach(println)

    //思路:filter->flatMap->reduceBykey
    val require2RDD: RDD[(String, Int)] = source.filter(fields => fields(8) != "null").flatMap(t => t(8).split(","))
      .map((_, 1)).reduceByKey(_ + _)
    require2RDD.collect().foreach(println)
    val require3RDD: RDD[(String, Int)] = source.filter(fields => fields(10) != "null").flatMap(t => t(10).split(","))
      .map((_, 1)).reduceByKey(_ + _)
    require3RDD.collect().foreach(println)
    val result: RDD[(String, Int, Int, Int)] = require1RDD.join(require2RDD).join(require3RDD).map {
      case (a, ((b, c), d)) => (a, b, c, d)
    }.sortBy(t=>(t._2,t._3,t._4),false)
    result.collect().foreach(println)
  }
}

算子实战1.2 [Case - If 多级筛选,多级排序]

/**
 * @author Jinxin Li
 * @create 2020-11-02 19:57
 */
object Case1_2 {
  def main(args: Array[String]): Unit = {
    val Conf1: SparkConf = new SparkConf().setMaster("local[2]").setAppName("project1.2")
    val sc: SparkContext = new SparkContext(Conf1)
    val source: RDD[Array[String]] = sc.textFile("Day07/input/user_visit_action.txt").map(_.split("_"))
    //filter->flatmap->map->reducebykey
    source.flatMap{
      case info if info(6) != "-1" => Array(((info(6),"click"),1))
      case info if info(8) != "null" => info(8).split(",").map(t=>((t,"order"),1))
      case info if info(10) != "null" => info(10).split(",").map(t=>((t,"pay"),1))
      case _ => Nil
    }.reduceByKey(_+_).map{
      case ((cateId,action),count) => (cateId,(action,count))
    }.groupByKey.mapValues(t=>{
      val map: Map[String, Int] = t.toMap
      (map.getOrElse("click",0),map.getOrElse("order",0),map.getOrElse("pay",0))
    }).sortBy(_._2,false).collect().foreach(println)
  }
}

算子实战1.3 [样例类,拓展样例类(重要)]

Scala的集合与泛型

算子实战1.4,1.5,2,3,自定义累加器

源码解析


文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录