算子
reduceBykey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
aggregateByKey
val dataRDD1 =
sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 =
dataRDD1.aggregateByKey(0)(_+_,_+_)
combineByKey
* @author Jinxin Li
* @create 2020-10-30 8:53
* 第一个初始值(但是传递函数),把每个分区的第一个值通过一个函数转化为初始值
* 第二个是区内聚合
* 第三个是区间聚合函数.跟上一个一样
*
* 没有经过shuffle数据不会改变排序
* 需求,区内字符串相加,区间字符串长度相乘
*/
object CombineByKey {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[3]").setAppName("project1"))
val list= List(
(1, "aa"), (1,"b"), (1,"a"),
(2,"ccc"), (2, "95"), (1,"b"))//这里(1,"b")是第二个分区,但是位置是3,但是key为1的时候他是第一个,用这个第一个数指
val input = sc.makeRDD(list, 2)
val combineRdd= input.combineByKey(
x=>x.length,//将每一个分区内的第一位数的进行函数作为初始值
//这里区内第一个数指的是,相同的key的值的第一个
(len:Int,str:String)=>len + str.length,//len是首值,也就是第一个函数传递的分区
(len1:Int,len2:Int) => len1*len2//不是函数的柯里化无法使用首值进行类型推断
).saveAsTextFile("Day05/output")
}
}
sortByKey
稳定排序:数字相同不会改变顺序
函数时间复杂度O(c),O(n),O(nlogn),O(n2)
object SortByKey {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("project1"))
val list= List(
(1, "aa"), (1,"b"), (1,"a"),
(2,"ccc"), (2, "95"), (1,"b"))
val input = sc.makeRDD(list, 2)
input.sortByKey().collect().foreach(println)
}
}
join
object Join {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("project1"))
val list= List(
(1, "aa"), (1,"b"), (1,"a"),
(2,"ccc"), (2, "95"), (1,"b"))//这里(1,"b")是第二个分区,但是位置是3,但是key为1的时候他是第一个,用这个第一个数指
val input1 = sc.makeRDD(list, 2)
val list2= List(
(1, "aa"), (1,"b"), (1,"a"),
(2,"ccc"), (2, "95"), (3,"b"))
val input2 = sc.makeRDD(list, 2)
input1.join(input2).collect().foreach(println)
//结果是相同的Key进行join 1号list有3个key1,2号有4个key1,两者会产生12个数据(key1)
//同时这个是内连接
//也就是3这个key会消失
}
}
PartitionBy(包含关于分区的一些说明)
object PartitionBy {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("project1"))
val rdd = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(1,"aaa"),(2,"bbb"),(3,"ccc")),3)
/*import org.apache.spark.HashPartitioner
val rdd2 = rdd.partitionBy(new HashPartitioner(3))*/
//关于makeRDD的分区事项,默认是根据range与核心数进行切分,分区,添加hash分区器之后按照hash进行分区
val fileRDD: RDD[String] = sc.textFile("Day05/input")
fileRDD.saveAsTextFile("Day05/output")
//textFile的分区默认按照2个分区.保证最少两个分区
//默认按照HDFS来进行分区,但是至少两个分区
}
}
count
//行动算子的主要特点就是返回一个非RDD,经过shuffle
object count {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("project1"))
val rdd= sc.textFile("Day02/input/word.txt")
// 返回RDD中元素的个数
val countResult: Long = rdd.count()
println(countResult)
}
}
cogroup
object cogroup {
//coGroup主要是对每一个key进行分组,然后针对于这些分组每一个分组作为一个buffer
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("project1"))
val dataRDD1 = sc.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sc.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)
value.foreach(println)
//(a,(CompactBuffer(1, 2),CompactBuffer(1)))
//(c,(CompactBuffer(3),CompactBuffer(2, 3)))
}
}
sc.longAccumulator
算子实战1.1
//6.2.1
object Case1_1 {
def main(args: Array[String]): Unit = {
val Conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("project")
val sc: SparkContext = new SparkContext(Conf)
val source: RDD[Array[String]] = sc.textFile("Day07/input/user_visit_action.txt").map(_.split("_"))
//品类点击总数
//思路:filter->map(6,1)->reduceBykey
val require1RDD: RDD[(String, Int)] = source.filter(fields => fields(6) != "-1").map(fields => (fields(6), 1)).reduceByKey(_ + _)
require1RDD.collect().foreach(println)
//思路:filter->flatMap->reduceBykey
val require2RDD: RDD[(String, Int)] = source.filter(fields => fields(8) != "null").flatMap(t => t(8).split(","))
.map((_, 1)).reduceByKey(_ + _)
require2RDD.collect().foreach(println)
val require3RDD: RDD[(String, Int)] = source.filter(fields => fields(10) != "null").flatMap(t => t(10).split(","))
.map((_, 1)).reduceByKey(_ + _)
require3RDD.collect().foreach(println)
val result: RDD[(String, Int, Int, Int)] = require1RDD.join(require2RDD).join(require3RDD).map {
case (a, ((b, c), d)) => (a, b, c, d)
}.sortBy(t=>(t._2,t._3,t._4),false)
result.collect().foreach(println)
}
}
算子实战1.2 [Case - If 多级筛选,多级排序]
/**
* @author Jinxin Li
* @create 2020-11-02 19:57
*/
object Case1_2 {
def main(args: Array[String]): Unit = {
val Conf1: SparkConf = new SparkConf().setMaster("local[2]").setAppName("project1.2")
val sc: SparkContext = new SparkContext(Conf1)
val source: RDD[Array[String]] = sc.textFile("Day07/input/user_visit_action.txt").map(_.split("_"))
//filter->flatmap->map->reducebykey
source.flatMap{
case info if info(6) != "-1" => Array(((info(6),"click"),1))
case info if info(8) != "null" => info(8).split(",").map(t=>((t,"order"),1))
case info if info(10) != "null" => info(10).split(",").map(t=>((t,"pay"),1))
case _ => Nil
}.reduceByKey(_+_).map{
case ((cateId,action),count) => (cateId,(action,count))
}.groupByKey.mapValues(t=>{
val map: Map[String, Int] = t.toMap
(map.getOrElse("click",0),map.getOrElse("order",0),map.getOrElse("pay",0))
}).sortBy(_._2,false).collect().foreach(println)
}
}
算子实战1.3 [样例类,拓展样例类(重要)]