Spark_SparkStreaming


云盘:miaochuanhai@163.com
暗号:大海哥最帅


一 概述

1.Spark架构

SparkStreaming整体架构图

2.DStream是什么?

每个时间区间收到的数据作为RDD存在

实时的封装

3.背压机制

可以自动调节receiver的接受速率

通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

二 DSteam入门

1.wordCount

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

1.1使用netcat作为源头wordcount

object WordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
    //初始化配置信息
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //接收数据
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    //处理
    val wordToSumStream: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    wordToSumStream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

1.2使用队列源计算wordcount

object queuePractice {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("opt")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    //创建queueInputDsteam
    val value: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)
    val result: DStream[Int] = value.reduce(_ + _)
    //打印结果
    result.print()
    ssc.start()
    for (i <- 1 to 5){
      rddQueue += ssc.sparkContext.makeRDD(1 to 5)
      Thread.sleep((2000))
    }
    ssc.awaitTermination()
  }
}

2.自定义数据源

2.1自定义数据源receiver

class defReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
  override def onStart(): Unit = {
    new Thread("Socket Receiver"){
      override def run(): Unit = {receive()}
    }.start()
  }
  def receive(): Unit = {
    val socket = new Socket(host, port)
    //创建一个bufferReader
    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
    //读取数据
    var input: String = reader.readLine()
    //当receiver没有关闭并且输入数据不为空,则循环发送数据给spark
    while(!isStopped()&&input !=null){
      store(input)
      input= reader.readLine()
    }
    //如果循环结束,则关闭资源
    reader.close()
    socket.close()
    //重启
    restart("restart")
  }
  override def onStop(): Unit = {}
}

2.2自定数据源的使用

object defSource {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("opt")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    //创建自动receiver的Streaming
    val lineDStream: ReceiverInputDStream[String] = ssc.receiverStream(new defReceiver("hadoop102", 9999))
    //将每一行数据做切分,形成一个个单词
    val result: DStream[(String, Int)] = lineDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    result.print()
    //启动并且阻塞
    ssc.start()
    ssc.awaitTermination()
  }
}

3.kafka数据源

Kafka数据源在SparkStreaming中存在两种API,一种是ReceiverAPI,另一种是DirectAPI

ReceiverAPI已经弃用

DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制

3.1kafka的命令行代码

#查看消费topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka -list
#创建kafka的topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka  --create --replication-factor 1 --partitions 2 --topic testTopic
#查看Topic详情
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka \
--describe --topic testTopic
#创建生产者
bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic testTopic
#创建消费者
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic testTopic
#查看_consumer_offsets主题中存储的offset
bin/kafka-consumer-groups.sh --bootstrap-server hadoop102:9092 --describe --group atguiguGroup
#如何查看消费者组的位置

3.2使用kafka源来流数据wordcount处理

object SparkStreaming_DirectAuto {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("KakfaSource").setMaster("local[*]")
    val scc = new StreamingContext(sparkConf, Seconds(3))

    //配置参数,为map
    //3.定义Kafka参数:kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )
    //使用DirectAPI创建DStream
    val kafkaToDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("testTopic"), kafkaPara))
    //处理数据
    kafkaToDStream.map(record=>record.value())
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .print()
    //关闭并堵塞
    scc.start()
    scc.awaitTermination()
  }
}

三 DStream转换

DStream上的操作与RDD的类似,分为转换和输出两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

3.1无状态转换

transfrom()是在Driver端开始执行的

//DStream其实就是内部封装了一个RDD,因此
object SparkStreaming_Transform {
  def main(args: Array[String]): Unit = {
    //初始化配置
    val sparkConf: SparkConf = new SparkConf().setAppName("KakfaSource").setMaster("local[*]")
    val scc = new StreamingContext(sparkConf, Seconds(3))
    //接受参数
    val rDStream: ReceiverInputDStream[String] = scc.socketTextStream("hadoop102", 9999)
    //使用transform可以将DStream转换成rdd
    println("主程序执行"+Thread.currentThread().getName)//JobGenerator Driver
    rDStream.transform(rdd=>{
      println("transform执行位置"+Thread.currentThread().getName)
      rdd.flatMap(line=>line.split(" ")).map((_,1)).reduceByKey(_+_)}).print()
    //关闭并堵塞
    scc.start()
    scc.awaitTermination()
  }
}

3.2有状态转换

有状态转化操作:计算当前批次RDD时,需要用到历史RDD的数据。

(1)UpdateStateByKey()

参数中需要传递一个函数,在函数内部根据需求对==当前数据==与==历史数据==进行整合

需求:使用UpdateStateByKey计算WordCount

/**
 * @author Jinxin Li
 * @create 2020-11-24 18:24
 * 通过累积历史状态做的实时的wordcount
 */
object sparkStreaming_updateStateByKey {
  def main(args: Array[String]): Unit = {

    //这里只会相加key相同的数值,因为是reduce,key相同进来,只要设定value的方法即可
    // 定义更新状态方法,参数seq为当前批次单词次数,state为以往批次单词次数
    val updateFunc = (seq:Seq[Int],state:Option[Int])=>{
      //当前批次
      val currentCount = seq.sum
      //历史批次
      val previousCount =  state.getOrElse(0)
      //总的数据累加
      Some(currentCount+previousCount)
    }
    val sparkConf: SparkConf = new SparkConf().setAppName("KakfaSource").setMaster("local[*]")
    val scc = new StreamingContext(sparkConf, Seconds(3))

    scc.checkpoint("./SparkStreaming/ck")//设定存储点,默认的计算与历史相关的都要设定检查点

    val reDStream: ReceiverInputDStream[String] = scc.socketTextStream("hadoop102", 9999)
    val KvDStream: DStream[(String, Int)] = reDStream.flatMap(_.split(" ")).map((_, 1))
    KvDStream.updateStateByKey[Int](
      updateFunc,
      new HashPartitioner(2)).print()
    //关闭并堵塞
    scc.start()
    scc.awaitTermination()
  }
}

缺点**

这种UpdateStateByKey在企业中几乎不用,因为存在很多缺点

  1. 使用checkPoint检查点存储记录历史会产生大量的小文件

1606214937943

  1. 当前线程关闭后,下次开启重新开始累积
  2. 即使重新开始累积,但是checkpoint会记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次,容易导致启动卡死

(2)WindowOperations

Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

窗口函数操作

需求

基本语法:window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的DStream。

需求:统计WordCount:3秒一个批次,窗口12秒,滑步6秒。

object SparkStreaming_window {
  def main(args: Array[String]): Unit = {
    //这里只会相加key相同的数值,因为是reduce,key相同进来,只要设定value的方法即可
    // 定义更新状态方法,参数seq为当前批次单词次数,state为以往批次单词次数
    val sparkConf: SparkConf = new SparkConf().setAppName("KakfaSource").setMaster("local[*]")
    val scc = new StreamingContext(sparkConf, Seconds(3))
    scc.checkpoint("./SparkStreaming/ck")//设定存储点,默认的计算与历史相关的都要设定检查点
    val reDStream: ReceiverInputDStream[String] = scc.socketTextStream("hadoop102", 9999)
    val lineToTuples: DStream[(String, Int)] = reDStream.flatMap(_.split(" ")).map((_, 1))
    //设定窗口,3秒一个批次,窗口持续12秒,步长6秒
    val windowDStream: DStream[(String, Int)] = lineToTuples.window(Seconds(12), Seconds(6))
    windowDStream.reduceByKey(_+_).print()
    //关闭并堵塞
    scc.start()
    scc.awaitTermination()
  }
}

窗口计算流程图

(3)reduceByKeyAndWindow

reduceByKeyAndWindow(func, windowLength, slideInterval,[numTasks]):

当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。

这个是window程序的简化版

object SparkStreaming_reduceByKeyAndWindow {
  def main(args: Array[String]): Unit = {
    //初始化配置
    val sparkConf: SparkConf = new SparkConf().setAppName("KakfaSource").setMaster("local[*]")
    val scc = new StreamingContext(sparkConf, Seconds(3))
    //接受参数
    val line: ReceiverInputDStream[String] = scc.socketTextStream("hadoop102", 9999)
    val tuplesDStream: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1))
    tuplesDStream.reduceByKeyAndWindow(
      (a:Int,b:Int)=>a+b,//这里要表明类型
      Seconds(6),
      Seconds(12)
    ).print()
    scc.start()
    scc.awaitTermination()
  }
}

(4)reduceByKeyAndWindow(反向Reduce)

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval,[numTasks])

这个是上一个函数的升级版

通过一个减法,减少统计窗口之间的重复部分

object SparkStreaming_reduceByKeyAndWindow_reduce {
  def main(args: Array[String]): Unit = {
    //初始化配置
    val sparkConf: SparkConf = new SparkConf().setAppName("KakfaSource").setMaster("local[*]")
    val scc = new StreamingContext(sparkConf, Seconds(3))
    //接受参数
    scc.checkpoint("./ck")
    val line: ReceiverInputDStream[String] = scc.socketTextStream("hadoop102", 9999)
    val tuplesDStream: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1))
    tuplesDStream.reduceByKeyAndWindow(
      (a:Int,b:Int)=>a+b,//这里要表明类型
      (x:Int,y:Int)=>x-y,
      Seconds(12),//这个是窗口间隔
      Seconds(6),//这个是滑动距离
      new HashPartitioner(2),
      (x:(String, Int)) => x._2 > 0//过滤小于0值
    ).print()
    scc.start()
    scc.awaitTermination()
  }
}

3.3其他操作

(1)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

(2)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

五 DSteam输出

企业中常用以下方法进行DSteam输出

print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。

foreachRDD(func):这是最通用的输出操作,函数func用于产生DStream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者写入数据库。

在企业开发中通常采用foreachRDD(),它用来对DStream中的RDD进行任意计算。这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到如MySQL的外部数据库中

object SparkStreaming_output {
  def main(args: Array[String]): Unit = {
    //初始化配置
    val sparkConf: SparkConf = new SparkConf().setAppName("KakfaSource").setMaster("local[*]")
    val scc = new StreamingContext(sparkConf, Seconds(3))
    //接受参数
    scc.checkpoint("./ck")
    val line: ReceiverInputDStream[String] = scc.socketTextStream("hadoop102", 9999)
    val tuplesDStream: DStream[(String, Int)] = line.flatMap(_.split(" ")).map((_, 1))
    //tuplesDStream.foreachRDD(rdd=>rdd.collect().foreach(println))
    //常规这样输出是一条一条的输出,但是常规不适合这样操作
    tuplesDStream.foreachRDD(rdd=>rdd.foreachPartition {
      println(Thread.currentThread().getName//需要注意的是,转换输出也是在Driver端执行的
      item => item.foreach(println)
    })
    scc.start()
    scc.awaitTermination()
  }
}

(1)连接不能写在Driver层面(序列化)

(2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

(3)增加foreachPartition,在分区创建(获取)。还是在Driver端进行?

六 优雅的关闭

使用外部文件系统来控制内部程序关闭

使用object

object close2 {
  def main(args: Array[String]): Unit = {
    val properties = new Properties()
    val config = PropertiesUtil.load("config.properties")
    val sparkConf: SparkConf = new SparkConf().setAppName("sparkStreaming").setMaster("local[*]")
      .set("spark.streaming.stopGracefullyOnShutdown", config.getProperty("spark.streaming.stopGracefullyOnShutdown"))
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999)
    lineDStream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    new Thread(new StopMonitor(ssc)).start()

    //关闭
    ssc.start()
    ssc.awaitTermination()
  }
}

关闭object

class StopMonitor(ssc: StreamingContext) extends Runnable{
  override def run(): Unit = {
    //获得文件系统
    val fileSystem: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:8020"), new Configuration(), "atguigu")
    //循环监测
    while (true){
      val bool: Boolean = fileSystem.exists(new Path("hdfs://hadoop102:8020/stopSpark"))
      Thread.sleep(5000)
      //判断
      if (bool){
        val state: StreamingContextState = ssc.getState()
        if (state==StreamingContextState.ACTIVE){
          ssc.stop(stopSparkContext = true,stopGracefully = true)
          System.exit(0)
        }
      }
    }
  }
}

文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录