SparkStreaming_Case_总结
工具类是必须要学习记忆的
同时要按照工业化流程写程序
1.配置文件
#JDBC配置
jdbc.datasource.size=10
jdbc.url=jdbc:mysql://hadoop102:3306/spark_2020?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
jdbc.user=root
jdbc.password=123456
# Kafka配置
kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092
kafka.topic=testTopic
2.工具类
2.1JDBC工具类
package com.atguigu.utils
import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Properties
import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource
/**
* @author Jinxin Li
* @create 2020-11-25 19:49
*/
object JDBCUtil {
//初始化连接池
val dataSource: DataSource = init()
//初始化连接池方法
def init() = {
val properties = new Properties()
val config: Properties = PropertiesUtil.load("config.properties")
properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
properties.setProperty("url", config.getProperty("jdbc.url"))
properties.setProperty("username", config.getProperty("jdbc.user"))
properties.setProperty("password", config.getProperty("jdbc.password"))
properties.setProperty("maxActive", config.getProperty("jdbc.datasource.size"))
//使用Druid创建数据库连接池
val source: DataSource = DruidDataSourceFactory.createDataSource(properties)
source
}
//获取MySQL连接
def getConnection: Connection = {
dataSource.getConnection
}
//执行SQL语句,单条数据插入
def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {
var rtn = 0
var pstmt: PreparedStatement = null
try {
connection.setAutoCommit(false)
pstmt = connection.prepareStatement(sql)
if (params != null && params.length > 0) {
for (i <- params.indices) {
pstmt.setObject(i + 1, params(i))
}
}
rtn = pstmt.executeUpdate()
connection.commit()
pstmt.close()
} catch {
case e: Exception => e.printStackTrace()
}
rtn
}
//判断一条数据是否存在
def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean = {
var flag: Boolean = false
var pstmt: PreparedStatement = null
try {
pstmt = connection.prepareStatement(sql)
for (i <- params.indices) {
pstmt.setObject(i + 1, params(i))
}
flag = pstmt.executeQuery().next()
pstmt.close()
} catch {
case e: Exception => e.printStackTrace()
}
flag
}
//获取MySQL的一条数据
def getDataFromMysql(connection: Connection, sql: String, params: Array[Any]): Long = {
var result: Long = 0L
var pstmt: PreparedStatement = null
try {
pstmt = connection.prepareStatement(sql)
for (i <- params.indices) {
pstmt.setObject(i + 1, params(i))
}
val resultSet: ResultSet = pstmt.executeQuery()
while (resultSet.next()) {
result = resultSet.getLong(1)
}
resultSet.close()
pstmt.close()
} catch {
case e: Exception => e.printStackTrace()
}
result
}
//主方法,用于测试上述方法
def main(args: Array[String]): Unit = {
//1 获取连接
val connection: Connection = getConnection
//2 预编译SQL
val statement: PreparedStatement = connection.prepareStatement("select * from user_ad_count where userid = ?")
//3 传输参数
statement.setObject(1, "a")
//4 执行sql
val resultSet: ResultSet = statement.executeQuery()
//5 获取数据
while (resultSet.next()) {
println("111:" + resultSet.getString(1))
}
//6 关闭资源
resultSet.close()
statement.close()
connection.close()
}
}
2.2Kafka工具类
package com.atguigu.utils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
/**
* @author Jinxin Li
* @create 2020-11-25 19:28
*/
object MyKafkaUtil {
//创建配置对象]
private val properties = PropertiesUtil.load("config.properties")
//用于初始化链接到集群的地址
private val brokers: String = properties.getProperty("kafka.broker.list") //用于后面设置brokers使用
//创建DStream,返回接受的数
def getKafkaStream(topic: String, ssc: StreamingContext) = {
val kafkaPara = Map(
// "bootstrap.servers" -> brokers,
// "key.deserializer" -> classOf[StringDeserializer],
// "value.deserializer" -> classOf[StringDeserializer],
// "group.id" -> "commerce-consumer-group" //定义消费者组
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "commerce-consumer-group" //消费者组
)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String,String](Array(topic), kafkaPara))
kafkaDStream
}
}
2.3配置文件工具类
package com.atguigu.utils
import java.io.InputStreamReader
import java.util.Properties
/**
* @author Jinxin Li
* @create 2020-11-25 19:15
*/
object PropertiesUtil {
def main(args: Array[String]): Unit = {
}
def load(name:String)={
val properties = new Properties()
properties.load(new InputStreamReader(Thread.currentThread().getContextClassLoader.getResourceAsStream(name)))
properties
}
}
3.日志生成与按比重随机数
package com.atguigu.dataproduction
import java.util.Properties
import com.atguigu.utils.PropertiesUtil
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* @author Jinxin Li
* @create 2020-11-25 19:24
*/
//作用两秒往kafka里创建一个数据
//城市信息表: city_id :城市id city_name:城市名称 area:城市所在大区
case class CityInfo(city_id: Long, city_name: String, area: String)
object MockerRealTime {
/**
* 模拟的数据
* 格式 :timestamp area city userid adid
* 某个时间点 某个地区 某个城市 某个用户 某个广告
* 1604229363531 华北 北京 3 3
*/
def generateMockData(): Array[String] = {
val array: ArrayBuffer[String] = ArrayBuffer[String]()
val CityRandomOpt = RandomOptions(
RanOpt(CityInfo(1, "北京", "华北"), 30),
RanOpt(CityInfo(2, "上海", "华东"), 30),
RanOpt(CityInfo(3, "广州", "华南"), 10),
RanOpt(CityInfo(4, "深圳", "华南"), 20),
RanOpt(CityInfo(5, "天津", "华北"), 10)
)
val random = new Random()
// 模拟实时数据:
// timestamp province city userid adid
for (i <- 0 to 50) {
val timestamp: Long = System.currentTimeMillis()
val cityInfo: CityInfo = CityRandomOpt.getRandomOpt
val city: String = cityInfo.city_name
val area: String = cityInfo.area
val adid: Int = 1 + random.nextInt(6)
val userid: Int = 1 + random.nextInt(6)
// 拼接实时数据: 某个时间点 某个地区 某个城市 某个用户 某个广告
array += timestamp + " " + area + " " + city + " " + userid + " " + adid
}
array.toArray
}
def main(args: Array[String]): Unit = {
// 获取配置文件config.properties中的Kafka配置参数
val config: Properties = PropertiesUtil.load("config.properties")
val brokers: String = config.getProperty("kafka.broker.list")
val topic: String = config.getProperty("kafka.topic")
// 创建配置对象
val prop = new Properties()
// 添加配置
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// 根据配置创建Kafka生产者
val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](prop)
while (true) {
// 随机产生实时数据并通过Kafka生产者发送到Kafka集群中
for (line <- generateMockData()) {
kafkaProducer.send(new ProducerRecord[String, String](topic, line))
println(line)
}
Thread.sleep(2000)
}
}
}
package com.atguigu.dataproduction
import scala.collection.mutable.ListBuffer
import scala.util.Random
/**
* @author Jinxin Li
* @create 2020-11-25 19:23
*/
// value值出现的比例,例如:(男,8) (女:2)
case class RanOpt[T](value: T, weight: Int)
object RandomOptions {
def apply[T](opts: RanOpt[T]*): RandomOptions[T] = {
val randomOptions = new RandomOptions[T]()
for (opt <- opts) {
// 累积总的权重: 8 + 2
randomOptions.totalWeight += opt.weight
// 根据每个元素的自己的权重,向buffer中存储数据。权重越多存储的越多
for (i <- 1 to opt.weight) {
// 男 男 男 男 男 男 男 男 女 女
randomOptions.optsBuffer += opt.value
}
}
randomOptions
}
def main(args: Array[String]): Unit = {
for (i <- 1 to 10) {
println(RandomOptions(RanOpt("男", 8), RanOpt("女", 2)).getRandomOpt)
}
}
}
class RandomOptions[T](opts: RanOpt[T]*) {
var totalWeight = 0
var optsBuffer = new ListBuffer[T]
def getRandomOpt: T = {
// 随机选择:0-9
val randomNum: Int = new Random().nextInt(totalWeight)
// 根据随机数,作为角标取数
optsBuffer(randomNum)
}
}
4.黑名单业务
实现实时的动态黑名单机制:将每天对某个广告点击超过30次的用户拉黑。
注:黑名单保存到MySQL中。
package com.atguigu.app
import java.util.Properties
import com.atguigu.handler.BlackListHandler
import com.atguigu.utils.{MyKafkaUtil, PropertiesUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author Jinxin Li
* @create 2020-11-25 20:42
*/
object RealTimeApp {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("RealTimeApp ").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
// val topic = PropertiesUtil.load("config.perproties").getProperty("kafka.topic")
val properties: Properties = PropertiesUtil.load("config.properties")
val topic: String = properties.getProperty("kafka.topic")
//从kafka中读取数据
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(topic, ssc)
//将从kafka独处的数据转换为样例类对象
val adsLogDStream: DStream[Ads_log] = kafkaDStream.map(
record => {
//kafka得到的是kv值,注意先取出v值
val line: String = record.value()
//拿出字段信息
val info: Array[String] = line.split(" ")
//使用伴生对象的apply方法直接包装对象并返回
Ads_log(info(0).toLong, info(1), info(2), info(3), info(4))
}
)
//-----------------------------------------------------------
//现在已经万事具备,开始处理需求
//需求1 BlackListHandler广告黑名单业务,
// 将实现实时的动态黑名单机制:将每天对某个广告点击超过30次的用户拉黑。
val filterAdsLogDStream: DStream[Ads_log] = BlackListHandler.filterByBlackList(adsLogDStream)
BlackListHandler.addBlackList(filterAdsLogDStream)
filterAdsLogDStream.cache()
filterAdsLogDStream.count().print()
//启动任务
ssc.start()
ssc.awaitTermination()
}
}
// 创建数据的样例类对象
// 时间 地区 城市 用户id 广告id
case class Ads_log(timestamp: Long, area: String, city: String, userid: String, adid: String)
package com.atguigu.handler
import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.app.Ads_log
import com.atguigu.utils.JDBCUtil
import org.apache.spark.streaming.dstream.DStream
/**
* @author Jinxin Li
* @create 2020-11-25 20:39
*/
object BlackListHandler {
//实现实时的动态黑名单机制:将每天对某个广告点击超过30次的用户拉黑。
//注:黑名单保存到MySQL中。
private val sdf = new SimpleDateFormat("yyyy-MM-dd")
def addBlackList(filterAdsLogDStream: DStream[Ads_log]) = {
val dtAreaCityUserAdDStream: DStream[((String, String, String), Long)] = filterAdsLogDStream.map {
case adsLog => {
val dt: String = sdf.format(new Date(adsLog.timestamp))
((dt, adsLog.userid, adsLog.adid), 1L)
}
}.reduceByKey(_ + _)
//写出
dtAreaCityUserAdDStream.foreachRDD(
//先把流中的foreachRDD写出,然后分流写出
rdd=>rdd.foreachPartition(
iter=>{
val connection: Connection = JDBCUtil.getConnection
iter.foreach{
case ((dt,userid,adid),count)=>
JDBCUtil.executeUpdate(
connection,
"""
|insert into user_ad_count (dt,userid,adid,count)
|values (?,?,?,?)
|on duplicate key
|update count = count + ?
|""".stripMargin,Array(dt,userid,adid,count,count)
)
val ct: Long = JDBCUtil.getDataFromMysql(
connection,
"""
|select count from user_ad_count where dt=? and userid=? and adid=?
|""".stripMargin,Array(dt,userid,adid)
)
//如果大于30就加入黑名单
if (ct>=30){
JDBCUtil.executeUpdate(
connection,
"""
|insert into black_list (userid)
|values (?)
|on duplicate key
|update userid=?
|""".stripMargin,Array(userid,userid)
)
}
}
connection.close()
}
)
)
}
def filterByBlackList(adsLogDStream: DStream[Ads_log]): DStream[Ads_log] = {
adsLogDStream.filter(
adsLog=>{
//获取连接
val connection: Connection = JDBCUtil.getConnection
val bool: Boolean = JDBCUtil.isExist(
connection,
"""
|select * from black_list where userid=?
|""".stripMargin, Array(adsLog.userid)
)
connection.close()
!bool
}
)
}
}
5.统计点击总流量
描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL。
注意connection与connection.close要同时写
功能:DateAreaCityAdCountHandler
package com.atguigu.handler
import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.app.Ads_log
import com.atguigu.utils.JDBCUtil
import org.apache.spark.streaming.dstream.DStream
/**
* @author Jinxin Li
* @create 2020-11-25 23:58
* 统计每天各大区各个城市广告点击总数并保存至MySQL中
*/
object DateAreaCityAdCountHandler {
private val sdf = new SimpleDateFormat("yyyy-MM-dd")
def saveDateAreaCityAdCountToMysql(filterAdsLogDStream: DStream[Ads_log]) = {
val dateAreaCityAdToCount= filterAdsLogDStream.map(ads_log => {
//a.格式化为日期字符串
val dt: String = sdf.format(new Date(ads_log.timestamp))
//b.组合,返回
((dt, ads_log.area, ads_log.city, ads_log.adid), 1L)
}).reduceByKey(_ + _)
dateAreaCityAdToCount.foreachRDD(
//分配处理,是指遥控在executor进行执行
rdd=>{
rdd.foreachPartition(
iter=>{
val connection: Connection = JDBCUtil.getConnection
iter.foreach {
case ((dt, area, city, adid), ct) =>
JDBCUtil.executeUpdate(
connection,
"""
|INSERT INTO area_city_ad_count (dt)
|VALUES(?)
""".stripMargin,
Array(dt)
)
}
connection.close()
})
})
}
}
6.统计两分钟内广告点击量
LastHourAdCountHandler
说明:实际测试时,为了节省时间,统计的是2分钟内广告点击量
package com.atguigu.handler
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.app.Ads_log
import org.apache.spark.streaming.Minutes
import org.apache.spark.streaming.dstream.DStream
object LastHourAdCountHandler {
//时间格式化对象
private val sdf: SimpleDateFormat = new SimpleDateFormat("HH:mm")
// 过滤后的数据集,统计最近一小时(2分钟)广告分时点击总数
def getAdHourMintToCount(filterAdsLogDStream: DStream[Ads_log]): DStream[(String, List[(String, Long)])] = {
//1.开窗 => 时间间隔为1个小时 window()
val windowAdsLogDStream: DStream[Ads_log] = filterAdsLogDStream.window(Minutes(2))
//2.转换数据结构 ads_log =>((adid,hm),1L) map()
val adHmToOneDStream: DStream[((String, String), Long)] = windowAdsLogDStream.map(adsLog => {
val hm: String = sdf.format(new Date(adsLog.timestamp))
((adsLog.adid, hm), 1L)
})
//3.统计总数 ((adid,hm),1L)=>((adid,hm),sum) reduceBykey(_+_)
val adHmToCountDStream: DStream[((String, String), Long)] = adHmToOneDStream.reduceByKey(_ + _)
//4.转换数据结构 ((adid,hm),sum)=>(adid,(hm,sum)) map()
val adToHmCountDStream: DStream[(String, (String, Long))] = adHmToCountDStream.map { case ((adid, hm), count) =>
(adid, (hm, count))
}
//5.按照adid分组 (adid,(hm,sum))=>(adid,Iter[(hm,sum),...]) groupByKey
adToHmCountDStream
.groupByKey()
.mapValues(iter => iter.toList.sortWith(_._1 < _._1))
}
}