Spark_SparkStreaming_Case


SparkStreaming_Case_总结

工具类是必须要学习记忆的

同时要按照工业化流程写程序

企业化程序思路

1.配置文件

#JDBC配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop102:3306/spark_2020?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=123456

# Kafka配置
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.topic=testTopic

2.工具类

2.1JDBC工具类

package com.atguigu.utils

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Properties

import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource

/**
 * @author Jinxin Li
 * @create 2020-11-25 19:49
 */
object JDBCUtil {
  //初始化连接池
  val dataSource: DataSource = init()

  //初始化连接池方法
  def init() = {
    val properties = new Properties()
    val config: Properties = PropertiesUtil.load("config.properties")
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", config.getProperty("jdbc.url"))
    properties.setProperty("username", config.getProperty("jdbc.user"))
    properties.setProperty("password", config.getProperty("jdbc.password"))
    properties.setProperty("maxActive", config.getProperty("jdbc.datasource.size"))
    //使用Druid创建数据库连接池
    val source: DataSource = DruidDataSourceFactory.createDataSource(properties)
    source
  }

  //获取MySQL连接
  def getConnection: Connection = {
    dataSource.getConnection
  }

  //执行SQL语句,单条数据插入
  def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {

    var rtn = 0
    var pstmt: PreparedStatement = null

    try {
      connection.setAutoCommit(false)
      pstmt = connection.prepareStatement(sql)

      if (params != null && params.length > 0) {
        for (i <- params.indices) {
          pstmt.setObject(i + 1, params(i))
        }
      }

      rtn = pstmt.executeUpdate()

      connection.commit()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }

    rtn
  }

  //判断一条数据是否存在
  def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean = {

    var flag: Boolean = false
    var pstmt: PreparedStatement = null

    try {
      pstmt = connection.prepareStatement(sql)

      for (i <- params.indices) {
        pstmt.setObject(i + 1, params(i))
      }

      flag = pstmt.executeQuery().next()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }

    flag
  }

  //获取MySQL的一条数据
  def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]): Long = {

    var result: Long = 0L
    var pstmt: PreparedStatement = null

    try {
      pstmt = connection.prepareStatement(sql)

      for (i <- params.indices) {
        pstmt.setObject(i + 1, params(i))
      }

      val resultSet: ResultSet = pstmt.executeQuery()

      while (resultSet.next()) {
        result = resultSet.getLong(1)
      }

      resultSet.close()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }

    result
  }

  //主方法,用于测试上述方法
  def main(args: Array[String]): Unit = {

    //1 获取连接
    val connection: Connection = getConnection

    //2 预编译SQL
    val statement: PreparedStatement = connection.prepareStatement("select * from user_ad_count where userid = ?")

    //3 传输参数
    statement.setObject(1, "a")

    //4 执行sql
    val resultSet: ResultSet = statement.executeQuery()

    //5 获取数据
    while (resultSet.next()) {
      println("111:" + resultSet.getString(1))
    }

    //6 关闭资源
    resultSet.close()
    statement.close()
    connection.close()
  }
}

2.2Kafka工具类

package com.atguigu.utils

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

/**
 * @author Jinxin Li
 * @create 2020-11-25 19:28
 */
object MyKafkaUtil {

  //创建配置对象]
  private val properties = PropertiesUtil.load("config.properties")
  //用于初始化链接到集群的地址
  private val brokers: String = properties.getProperty("kafka.broker.list") //用于后面设置brokers使用


  //创建DStream,返回接受的数
  def getKafkaStream(topic: String, ssc: StreamingContext) = {
    val kafkaPara = Map(
//      "bootstrap.servers" -> brokers,
//      "key.deserializer" -> classOf[StringDeserializer],
//      "value.deserializer" -> classOf[StringDeserializer],
//      "group.id" -> "commerce-consumer-group" //定义消费者组
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "commerce-consumer-group" //消费者组
    )
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](Array(topic), kafkaPara))
    kafkaDStream
  }
}

2.3配置文件工具类

package com.atguigu.utils

import java.io.InputStreamReader
import java.util.Properties

/**
 * @author Jinxin Li
 * @create 2020-11-25 19:15
 */
object PropertiesUtil {
  def main(args: Array[String]): Unit = {

  }
  def load(name:String)={
    val properties = new Properties()
    properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(name)))
    properties
  }
}

3.日志生成与按比重随机数

package com.atguigu.dataproduction

import java.util.Properties

import com.atguigu.utils.PropertiesUtil
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

/**
 * @author Jinxin Li
 * @create 2020-11-25 19:24
 */
//作用两秒往kafka里创建一个数据

//城市信息表: city_id :城市id  city_name:城市名称   area:城市所在大区
case class CityInfo(city_id: Long, city_name: String, area: String)

object MockerRealTime {
  /**
   * 模拟的数据
   * 格式 :timestamp area city userid adid
   * 某个时间点 某个地区 某个城市 某个用户 某个广告
   * 1604229363531 华北 北京 3 3
   */
  def generateMockData(): Array[String] = {

    val array: ArrayBuffer[String] = ArrayBuffer[String]()

    val CityRandomOpt = RandomOptions(
      RanOpt(CityInfo(1, "北京", "华北"), 30),
      RanOpt(CityInfo(2, "上海", "华东"), 30),
      RanOpt(CityInfo(3, "广州", "华南"), 10),
      RanOpt(CityInfo(4, "深圳", "华南"), 20),
      RanOpt(CityInfo(5, "天津", "华北"), 10)
    )

    val random = new Random()

    // 模拟实时数据:
    // timestamp province city userid adid
    for (i <- 0 to 50) {

      val timestamp: Long = System.currentTimeMillis()
      val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
      val city: String = cityInfo.city_name
      val area: String = cityInfo.area
      val adid: Int = 1 + random.nextInt(6)
      val userid: Int = 1 + random.nextInt(6)

      // 拼接实时数据: 某个时间点 某个地区 某个城市 某个用户 某个广告
      array += timestamp + " " + area + " " + city + " " + userid + " " + adid
    }

    array.toArray
  }

  def main(args: Array[String]): Unit = {

    // 获取配置文件config.properties中的Kafka配置参数
    val config: Properties = PropertiesUtil.load("config.properties")
    val brokers: String = config.getProperty("kafka.broker.list")
    val topic: String = config.getProperty("kafka.topic")

    // 创建配置对象
    val prop = new Properties()

    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    // 根据配置创建Kafka生产者
    val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)

    while (true) {

      // 随机产生实时数据并通过Kafka生产者发送到Kafka集群中
      for (line <- generateMockData()) {
        kafkaProducer.send(new ProducerRecord[String, String](topic, line))
        println(line)
      }

      Thread.sleep(2000)
    }
  }
}
package com.atguigu.dataproduction

import scala.collection.mutable.ListBuffer
import scala.util.Random

/**
 * @author Jinxin Li
 * @create 2020-11-25 19:23
 */
// value值出现的比例,例如:(男,8) (女:2)
case class RanOpt[T](value: T, weight: Int)

object RandomOptions {

  def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {

    val randomOptions = new RandomOptions[T]()

    for (opt <- opts) {
      // 累积总的权重: 8 + 2
      randomOptions.totalWeight += opt.weight

      // 根据每个元素的自己的权重,向buffer中存储数据。权重越多存储的越多
      for (i <- 1 to opt.weight) {
        // 男 男 男 男 男 男 男 男 女 女
        randomOptions.optsBuffer += opt.value
      }
    }

    randomOptions
  }

  def main(args: Array[String]): Unit = {

    for (i <- 1 to 10) {
      println(RandomOptions(RanOpt("男", 8), RanOpt("女", 2)).getRandomOpt)
    }
  }
}

class RandomOptions[T](opts: RanOpt[T]*) {

  var totalWeight = 0
  var optsBuffer = new ListBuffer[T]

  def getRandomOpt: T = {
    // 随机选择:0-9
    val randomNum: Int = new Random().nextInt(totalWeight)
    // 根据随机数,作为角标取数
    optsBuffer(randomNum)
  }
}

4.黑名单业务

实现实时的动态黑名单机制:将每天对某个广告点击超过30次的用户拉黑。

注:黑名单保存到MySQL中。

package com.atguigu.app

import java.util.Properties

import com.atguigu.handler.BlackListHandler
import com.atguigu.utils.{MyKafkaUtil, PropertiesUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
 * @author Jinxin Li
 * @create 2020-11-25 20:42
 */
object RealTimeApp {

  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("RealTimeApp ").setMaster("local[*]")

    //2.创建StreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(3))

//    val topic = PropertiesUtil.load("config.perproties").getProperty("kafka.topic")
    val properties: Properties = PropertiesUtil.load("config.properties")
    val topic: String = properties.getProperty("kafka.topic")


    //从kafka中读取数据
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc)

    //将从kafka独处的数据转换为样例类对象
    val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(
      record => {
        //kafka得到的是kv值,注意先取出v值
        val line: String = record.value()
        //拿出字段信息
        val info: Array[String] = line.split(" ")
        //使用伴生对象的apply方法直接包装对象并返回
        Ads_log(info(0).toLong, info(1), info(2), info(3), info(4))
      }
    )
    //-----------------------------------------------------------
    //现在已经万事具备,开始处理需求
    //需求1 BlackListHandler广告黑名单业务,
    // 将实现实时的动态黑名单机制:将每天对某个广告点击超过30次的用户拉黑。
    val filterAdsLogDStream: DStream[Ads_log] = BlackListHandler.filterByBlackList(adsLogDStream)
    BlackListHandler.addBlackList(filterAdsLogDStream)

    filterAdsLogDStream.cache()
    filterAdsLogDStream.count().print()

    //启动任务
    ssc.start()
    ssc.awaitTermination()
  }
}
// 创建数据的样例类对象
// 时间 地区 城市 用户id 广告id
case class Ads_log(timestamp: Long, area: String, city: String, userid: String, adid: String)

package com.atguigu.handler

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.app.Ads_log
import com.atguigu.utils.JDBCUtil
import org.apache.spark.streaming.dstream.DStream

/**
 * @author Jinxin Li
 * @create 2020-11-25 20:39
 */
object BlackListHandler {
  //实现实时的动态黑名单机制:将每天对某个广告点击超过30次的用户拉黑。
  //注:黑名单保存到MySQL中。
  private val sdf = new SimpleDateFormat("yyyy-MM-dd")

  def addBlackList(filterAdsLogDStream: DStream[Ads_log]) = {
    val dtAreaCityUserAdDStream: DStream[((String, String, String), Long)] = filterAdsLogDStream.map {
      case adsLog => {
        val dt: String = sdf.format(new Date(adsLog.timestamp))
        ((dt, adsLog.userid, adsLog.adid), 1L)
      }
    }.reduceByKey(_ + _)

    //写出
    dtAreaCityUserAdDStream.foreachRDD(
      //先把流中的foreachRDD写出,然后分流写出
      rdd=>rdd.foreachPartition(
        iter=>{
          val connection: Connection = JDBCUtil.getConnection
          iter.foreach{
            case ((dt,userid,adid),count)=>
              JDBCUtil.executeUpdate(
                connection,
                """
                  |insert into user_ad_count (dt,userid,adid,count)
                  |values (?,?,?,?)
                  |on duplicate key
                  |update count = count + ?
                  |""".stripMargin,Array(dt,userid,adid,count,count)
              )
              val ct: Long = JDBCUtil.getDataFromMysql(
                connection,
                """
                  |select count from user_ad_count where dt=? and userid=? and adid=?
                  |""".stripMargin,Array(dt,userid,adid)
              )
              //如果大于30就加入黑名单
              if (ct>=30){
                JDBCUtil.executeUpdate(
                  connection,
                  """
                    |insert into black_list (userid)
                    |values (?)
                    |on duplicate key
                    |update userid=?
                    |""".stripMargin,Array(userid,userid)
                )
              }
          }
          connection.close()
        }
      )
    )
  }
  def filterByBlackList(adsLogDStream: DStream[Ads_log]): DStream[Ads_log] = {
    adsLogDStream.filter(
      adsLog=>{
        //获取连接
        val connection: Connection = JDBCUtil.getConnection
        val bool: Boolean = JDBCUtil.isExist(
          connection,
          """
            |select * from black_list where userid=?
            |""".stripMargin, Array(adsLog.userid)
        )
        connection.close()
        !bool
      }
    )
  }
}

5.统计点击总流量

描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL。

注意connection与connection.close要同时写

connection报错

报错信息

1606325292658

1606325343070

功能:DateAreaCityAdCountHandler

package com.atguigu.handler

import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.app.Ads_log
import com.atguigu.utils.JDBCUtil
import org.apache.spark.streaming.dstream.DStream

/**
 * @author Jinxin Li
 * @create 2020-11-25 23:58
 * 统计每天各大区各个城市广告点击总数并保存至MySQL中
 */
object DateAreaCityAdCountHandler {
  private val sdf = new SimpleDateFormat("yyyy-MM-dd")
  def saveDateAreaCityAdCountToMysql(filterAdsLogDStream: DStream[Ads_log]) = {
    val dateAreaCityAdToCount= filterAdsLogDStream.map(ads_log => {
      //a.格式化为日期字符串
      val dt: String = sdf.format(new Date(ads_log.timestamp))
      //b.组合,返回
      ((dt, ads_log.area, ads_log.city, ads_log.adid), 1L)
    }).reduceByKey(_ + _)
    dateAreaCityAdToCount.foreachRDD(
      //分配处理,是指遥控在executor进行执行
      rdd=>{
        rdd.foreachPartition(
          iter=>{
            val connection: Connection = JDBCUtil.getConnection
            iter.foreach {
              case ((dt, area, city, adid), ct) =>
                JDBCUtil.executeUpdate(
                  connection,
                  """
                    |INSERT INTO area_city_ad_count (dt)
                    |VALUES(?)
                    """.stripMargin,
                  Array(dt)
                )
            }
            connection.close()
          })
      })
  }
}

6.统计两分钟内广告点击量

LastHourAdCountHandler

说明:实际测试时,为了节省时间,统计的是2分钟内广告点击量


package com.atguigu.handler

import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.app.Ads_log
import org.apache.spark.streaming.Minutes
import org.apache.spark.streaming.dstream.DStream

object LastHourAdCountHandler {

  //时间格式化对象
  private val sdf: SimpleDateFormat = new SimpleDateFormat("HH:mm")

  // 过滤后的数据集,统计最近一小时(2分钟)广告分时点击总数
  def getAdHourMintToCount(filterAdsLogDStream: DStream[Ads_log]): DStream[(String, List[(String, Long)])] = {

    //1.开窗 => 时间间隔为1个小时 window()
    val windowAdsLogDStream: DStream[Ads_log] = filterAdsLogDStream.window(Minutes(2))

    //2.转换数据结构 ads_log =>((adid,hm),1L) map()
    val adHmToOneDStream: DStream[((String, String), Long)] = windowAdsLogDStream.map(adsLog => {

      val hm: String = sdf.format(new Date(adsLog.timestamp))

      ((adsLog.adid, hm), 1L)
    })

    //3.统计总数 ((adid,hm),1L)=>((adid,hm),sum) reduceBykey(_+_)
    val adHmToCountDStream: DStream[((String, String), Long)] = adHmToOneDStream.reduceByKey(_ + _)

    //4.转换数据结构 ((adid,hm),sum)=>(adid,(hm,sum)) map()
    val adToHmCountDStream: DStream[(String, (String, Long))] = adHmToCountDStream.map { case ((adid, hm), count) =>
      (adid, (hm, count))
    }

    //5.按照adid分组 (adid,(hm,sum))=>(adid,Iter[(hm,sum),...]) groupByKey
    adToHmCountDStream
      .groupByKey()
      .mapValues(iter => iter.toList.sortWith(_._1 < _._1))
  }
}

文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录