目录

Flink读写Kafka

概述

本文简单介绍一下如何通过 Flink 将数据写入 Kafka。

Flink读写Kafka

Flink 提供了 Kafka Connector 来读写 Kafka 主题的消息。Flink Consumer 则集成了 Flink 的 checkpoint 机制来保证 exactly-once 的语义。Flink 并不单纯依赖 Kafka 的消费组的 offset 的定位的机制,而是通过 checkpoint,从 Flink 内部来维护 offset。

可以在你的 Maven 项目中引入以下依赖。

1
2
3
4
5
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.7.1</version>
</dependency>

本文介绍的所有组件依赖版本。

组件 version
ZooKeeper 3.5.3
Kafka 1.1.0
Flink 1.7.1
Scala 2.11.8

Kafka Consumer

Kafka Consumers Start Position Configuration

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer08[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

val stream = env.addSource(myConsumer)
...

关于消费者的消息消费开始位置,所有版本的 Flink Kafka Consumer 都有以上显示的配置来控制。

  1. setStartFromGroupOffsets (default behaviour): Start reading partitions from the consumer group’s (group.id setting in the consumer properties) committed offsets in Kafka brokers (or Zookeeper for Kafka 0.8). If offsets could not be found for a partition, the auto.offset.reset setting in the properties will be used.
  2. setStartFromEarliest() / setStartFromLatest(): Start from the earliest / latest record. Under these modes, committed offsets in Kafka will be ignored and not used as starting positions.
  3. setStartFromTimestamp(long): Start from the specified timestamp. For each partition, the record whose timestamp is larger than or equal to the specified timestamp will be used as the start position. If a partition’s latest record is earlier than the timestamp, the partition will simply be read from the latest record. Under this mode, committed offsets in Kafka will be ignored and not used as starting positions.

You can also specify the exact offsets the consumer should start from for each partition:

你也可以指定明确的 offsets 的不同分区的开始位置,可以设置成不一样的(但是意义在哪里?)。

1
2
3
4
5
6
val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)

myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

当 Flink 程序出现错误进行恢复的时候,这个配置当然不会影响,因为所有的 offset 都会从 checkpoint 或者 savepoint 进行恢复。

Kafka Consumers and Fault Tolerance

Kafka Consumers Topic and Partition Discovery

警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。