一、问题描述
关于kafka客户端消息订阅产生demo示例中,运行KafkaProducerTest消息发送可以产生消息,但是运行消息消费订阅时候,卡在“ConsumerRecords<String, String> consumerDatas=consumer.poll(100);”代码段,无法继续往下执行了,造成无法正常消费订阅消息并打印有效日志。
二、解决方法
1. 通过服务端bin目录提供命令行工具测试本地服务kafka-console-producer.sh和kafka-console-consumer.sh)kafka安装发送接收消息正常
2. 关于config/server.properties中的listeners=PLAINTEXT://:9092改为具体本地ip地址l如isteners=PLAINTEXT://192.168.1.18:9092 ;其他更多相关配置注意点参考demo示例说明部分
三、代码说明
1. KafkaConsumerTest类
package com.xwood.test.kafka;@b@@b@import java.util.Arrays;@b@import java.util.Iterator;@b@import java.util.Properties;@b@@b@import org.apache.kafka.clients.consumer.ConsumerRecord;@b@import org.apache.kafka.clients.consumer.ConsumerRecords;@b@import org.apache.kafka.clients.consumer.KafkaConsumer;@b@@b@public class KafkaConsumerTest {@b@ @b@ private static final String topic = "kafkaTopic";@b@ @b@ public static void main(String[] args) throws Exception{@b@ @b@ Properties props = new Properties();@b@ props.put("bootstrap.servers", "192.168.1.202:9092");//单节点,kafka多节点时候使用,逗号隔开@b@ props.put("group.id", "test-consumer-group"); //定义消费组@b@ props.put("enable.auto.commit", "true");@b@ props.put("auto.commit.interval.ms", "1000");@b@ props.put("auto.offset.reset", "earliest");@b@// props.put("partition.assignment.strategy", "range");@b@// props.put("session.timeout.ms", "30000");@b@ @b@ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");@b@ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");@b@@b@ @b@ @b@ KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);@b@ consumer.subscribe(Arrays.asList(topic));//订阅主题@b@ @b@ while(true){@b@ ConsumerRecords<String, String> consumerDatas=consumer.poll(100);@b@ @b@ if(consumerDatas!=null){@b@ Iterator<ConsumerRecord<String, String>> consumerIter=consumerDatas.iterator();@b@ while(consumerIter.hasNext()){@b@ ConsumerRecord<String, String> consumerData=consumerIter.next();@b@ System.out.printf("offset = %d, key = %s, value = %s%n", consumerData.offset(), consumerData.key(), consumerData.value());@b@ }@b@ }else{@b@ System.out.println("KafkaConsumer1 is waiting message...");@b@ Thread.sleep(1000);@b@ }@b@ @b@ @b@ }@b@@b@ }@b@ @b@@b@}
2. KafkaProducerTest类
package com.xwood.test.kafka;@b@@b@import java.util.Properties;@b@@b@import org.apache.kafka.clients.producer.KafkaProducer;@b@import org.apache.kafka.clients.producer.ProducerRecord;@b@@b@public class KafkaProducerTest {@b@ @b@ // Topic@b@ private static final String topic = "kafkaTopic";@b@@b@ public static void main(String[] args) throws Exception{@b@ @b@ Properties props = new Properties();@b@ props.put("bootstrap.servers", "192.168.1.202:9092");@b@ props.put("acks", "0");@b@// props.put("group.id", "test-consumer-group");@b@ props.put("retries", "0");@b@ //设置key和value序列化方式@b@ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");@b@ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");@b@@b@ //生产者实例@b@ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);@b@@b@ int i = 1;@b@@b@ // 发送业务消息@b@ // 读取文件 读取内存数据库 读socket端口@b@ while (true) {@b@ producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));@b@ System.out.println("key:" + i + " " + "value:" + i);@b@ i++;@b@ Thread.sleep(5000);@b@ } @b@ @b@ }@b@ @b@@b@}