Flink03_流处理API


0x0.执行环境Environment

0x1.Source

3.KafkaSource

直接上代码,从kafka读取数据然后进行处理

example

public class Flink04_Source_Kafka {
    public static void main(String[] args) throws Exception {

        //0x0 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //0x1 获取Kafka数据源
            // kafka配置项
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

            // 从kafka读取数据 要使用通用的方式,需要额外的jar包flink-connector-kafka-0.11_2.12
            // kafka - RichParallelSourceFunction 可以并行
        DataStreamSource<String> kafkaDS = env.addSource(
                new FlinkKafkaConsumer011<String>(
                        "test",
                        new SimpleStringSchema(),
                        properties)).setParallelism(2);
            //设置并行度为2测试并行能力
        SingleOutputStreamOperator<SensorReading> result = kafkaDS.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields = s.split(",");
                return new SensorReading(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
            }
        });

        //0x2 打印数据
        result.print();

        //0x3 执行环境
        env.execute();

        //0x4 开启kafka生产者
        //bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test

    }
}

kafka开启与查看命令

#查看指定topic的详细信息
bin/kafka-topics.sh --zookeeper  hadoop102:2181/kafka --describe --topic test

#所有查看topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --list

#创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181/kafka --create --replication-factor 2 --partitions 10 --topic testTopic

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10 --topic testTopic

#往topic里生产数据
bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test

#消费topic
bin/kafka-console-consumer --bootstrap-server hadoop102:9092 --from-beginning  --topic register_topic

kafka分区策略

  1. 指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
  3. 既没有 partition 值又没有 key 值的情况下, kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,kafka再随机一个分区进行使用.

kafka分区分配策略

一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。

Kafka有三种分配策略,RoundRobin,Range , Sticky。

一个消费者组中的消费者可以消费一个Topic的不同分区

kafka集群查看策略

#连接zk
bin/zkCli.sh -timeout 5000 -r -server hadoop102:2181
#查看zookeeper集群下kafka集群下所有的brokers id列表
ls /kafka/brokers/ids
#查看2的broker的信息
get /kafka/brokers/ids/2

# 列出帮助文档,英文好的同学基本看帮助文档就可以指定大概怎么使用该命令了
kafka-topics --help
# 列出kafka集群下的所有topics,这里需要指定kafka机器元数据存储所在的zk机器地址,记得如果有namespace,要也加上,否则将连不上kafka集群
 kafka-topics --zookeeper hadoop102:2181/kafka --list
 
# 创建一个topic为test的topic,并指定分区数为5,副本数为1。这里的副本数不能超过broker的数量,否则会报错
kafka-topics --topic test --zookeeper hadoop102:2181/kafka --create --replication-factor 1 --partitions 5
# 创建时指定副本在哪个broker上,多个partition之间用逗号分隔,副本之间用":"分割,第一个副本默认是leader
kafka-topics.sh --zookeeper hadoop102:2181/kafka --topic test --create --replica-assignment 0:1,1:2,0:2

# 删除指定主题
kafka-topics --zookeeper localhost:2181/kafka --delete --topic yangjb_test

# 将partition数量修改成7个
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --partitions 7
# 通过 --replica-assignment 参数指定新增partition的副本分布情况

# 如果原先的partition数量是3,那么新增的一个分区的副本分布应该在1002和1003
kafka-topics --zookeeper localhost:2181/kafka --topic test -alter --partitions 4 --replica-assignment 0:1,1:2,0:2,2:3

# 修改topic test的配置 flush.ms =30000 。
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --config flush.ms=30000

# 删除topic test的 flush.ms 配置
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --delete-config flush.ms

==问题整理==

问题1:当我的一个模块依赖于另一个模块时,单模块如何打包

问题2:kafka增加并行度,相当于增加了同一个组的消费者么?

0x2 Operator

1.map

map算子

2.Filter

flink-filter算子

3.KeyBy

Flink-KeyBy

4.滚动聚合算子(Rolling Aggregation)


文章作者: Jinxin Li
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Jinxin Li !
  目录