0x0.执行环境Environment
0x1.Source
3.KafkaSource
直接上代码,从kafka读取数据然后进行处理
example
public class Flink04_Source_Kafka {
public static void main(String[] args) throws Exception {
//0x0 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//0x1 获取Kafka数据源
// kafka配置项
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
// 从kafka读取数据 要使用通用的方式,需要额外的jar包flink-connector-kafka-0.11_2.12
// kafka - RichParallelSourceFunction 可以并行
DataStreamSource<String> kafkaDS = env.addSource(
new FlinkKafkaConsumer011<String>(
"test",
new SimpleStringSchema(),
properties)).setParallelism(2);
//设置并行度为2测试并行能力
SingleOutputStreamOperator<SensorReading> result = kafkaDS.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String s) throws Exception {
String[] fields = s.split(",");
return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
}
});
//0x2 打印数据
result.print();
//0x3 执行环境
env.execute();
//0x4 开启kafka生产者
//bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test
}
}
kafka开启与查看命令
#查看指定topic的详细信息
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --describe --topic test
#所有查看topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list
#创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 2 --partitions 10 --topic testTopic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10 --topic testTopic
#往topic里生产数据
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test
#消费topic
bin/kafka-console-consumer --bootstrap-server hadoop102:9092 --from-beginning --topic register_topic
kafka分区策略
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
- 既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.
kafka分区分配策略
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有三种分配策略,RoundRobin,Range , Sticky。
一个消费者组中的消费者可以消费一个Topic的不同分区
kafka集群查看策略
#连接zk
bin/zkCli.sh -timeout 5000 -r -server hadoop102:2181
#查看zookeeper集群下kafka集群下所有的brokers id列表
ls /kafka/brokers/ids
#查看2的broker的信息
get /kafka/brokers/ids/2
# 列出帮助文档,英文好的同学基本看帮助文档就可以指定大概怎么使用该命令了
kafka-topics --help
# 列出kafka集群下的所有topics,这里需要指定kafka机器元数据存储所在的zk机器地址,记得如果有namespace,要也加上,否则将连不上kafka集群
kafka-topics --zookeeper hadoop102:2181/kafka --list
# 创建一个topic为test的topic,并指定分区数为5,副本数为1。这里的副本数不能超过broker的数量,否则会报错
kafka-topics --topic test --zookeeper hadoop102:2181/kafka --create --replication-factor 1 --partitions 5
# 创建时指定副本在哪个broker上,多个partition之间用逗号分隔,副本之间用":"分割,第一个副本默认是leader
kafka-topics.sh --zookeeper hadoop102:2181/kafka --topic test --create --replica-assignment 0:1,1:2,0:2
# 删除指定主题
kafka-topics --zookeeper localhost:2181/kafka --delete --topic yangjb_test
# 将partition数量修改成7个
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --partitions 7
# 通过 --replica-assignment 参数指定新增partition的副本分布情况
# 如果原先的partition数量是3,那么新增的一个分区的副本分布应该在1002和1003
kafka-topics --zookeeper localhost:2181/kafka --topic test -alter --partitions 4 --replica-assignment 0:1,1:2,0:2,2:3
# 修改topic test的配置 flush.ms =30000 。
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --config flush.ms=30000
# 删除topic test的 flush.ms 配置
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --delete-config flush.ms
==问题整理==
问题1:当我的一个模块依赖于另一个模块时,单模块如何打包
问题2:kafka增加并行度,相当于增加了同一个组的消费者么?