阅读有关Kafka的《Kafka学习笔记》之“6. Kafka消费者 之 offset的维护”、”2. Consumer消费数据流程”章节,通过offset偏移量便故障(现断电宕机等故障)恢复后继续消费
1)索引文件+数据文件(offset偏移量关联)
注:kafka会保存每个topic数据消费的记录offset,以便记录consumer消费到哪个数据了
2)Consumer消费成功后,再提交offset偏移量并持久化 - 同步提交和异步提交区分
2.1)同步提交代码示例
// 同步提交: consumer提交完毕offset之后,才会继续消费数据。 @b@//3. 消费数据 while (true){ //JDK1.8 的API 毫秒数,@b@ ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100));@b@ for (ConsumerRecord<String, String> cr : crs) { @b@ System.out.println("cr = " + cr); @b@ } @b@ kafkaConsumer.commitAsync();@b@}
2.2)异步提交代码示例
// 异步提交: consumer只需要发出提交offset的指令之后,就可以继续消费数据,不需要等待本地offset 是否提交成功。 @b@while (true){ @b@ //JDK1.8 的API 毫秒数,@b@ ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100)); @b@ for (ConsumerRecord<String, String> cr : crs) { @b@ System.out.println("cr = " + cr);@b@ } @b@ kafkaConsumer.commitSync(); @b@}