首页  |  知识库  |  资源下载  |  在线工具  |  A-Z  •  JAR  •  名词查         

解决“java实现kafka客户端消息产生订阅正常,但消费者获取消息consumer.poll被卡住了,无法正常打印接受消息日志”问题

标签:kafka消息订阅问题,consumer.poll,kafka消费者卡住了     发布时间:2019-03-25   

一、问题描述

关于kafka客户端消息订阅产生demo示例中,运行KafkaProducerTest消息发送可以产生消息,但是运行消息消费订阅时候,卡在“ConsumerRecords<String, String> consumerDatas=consumer.poll(100);”代码段,无法继续往下执行了,造成无法正常消费订阅消息并打印有效日志。

二、解决方法

1. 通过服务端bin目录提供命令行工具测试本地服务kafka-console-producer.sh和kafka-console-consumer.sh)kafka安装发送接收消息正常

2. 关于config/server.properties中的listeners=PLAINTEXT://:9092改为具体本地ip地址l如isteners=PLAINTEXT://192.168.1.18:9092 ;其他更多相关配置注意点参考demo示例说明部分

三、代码说明

1. KafkaConsumerTest类

package com.xwood.test.kafka;@b@@b@import java.util.Arrays;@b@import java.util.Iterator;@b@import java.util.Properties;@b@@b@import org.apache.kafka.clients.consumer.ConsumerRecord;@b@import org.apache.kafka.clients.consumer.ConsumerRecords;@b@import org.apache.kafka.clients.consumer.KafkaConsumer;@b@@b@public class KafkaConsumerTest {@b@	@b@	private static final String topic = "kafkaTopic";@b@    @b@	public static void main(String[] args)  throws  Exception{@b@		@b@		 	Properties props = new Properties();@b@	        props.put("bootstrap.servers", "192.168.1.202:9092");//单节点,kafka多节点时候使用,逗号隔开@b@	        props.put("group.id", "test-consumer-group"); //定义消费组@b@	        props.put("enable.auto.commit", "true");@b@	        props.put("auto.commit.interval.ms", "1000");@b@	        props.put("auto.offset.reset", "earliest");@b@//	        props.put("partition.assignment.strategy", "range");@b@//	        props.put("session.timeout.ms", "30000");@b@	       @b@	        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");@b@	        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");@b@@b@	        @b@	        @b@	        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);@b@	        consumer.subscribe(Arrays.asList(topic));//订阅主题@b@	        @b@	        while(true){@b@	        	ConsumerRecords<String, String> consumerDatas=consumer.poll(100);@b@	        	@b@	        	if(consumerDatas!=null){@b@	        		Iterator<ConsumerRecord<String, String>> consumerIter=consumerDatas.iterator();@b@		        	while(consumerIter.hasNext()){@b@		        		ConsumerRecord<String, String>  consumerData=consumerIter.next();@b@		        		System.out.printf("offset = %d, key = %s, value = %s%n", consumerData.offset(), consumerData.key(), consumerData.value());@b@		        	}@b@	        	}else{@b@	        		System.out.println("KafkaConsumer1 is waiting message...");@b@	        		Thread.sleep(1000);@b@	        	}@b@	        	@b@	        	@b@	        }@b@@b@	}@b@	@b@@b@}

2. KafkaProducerTest类

package com.xwood.test.kafka;@b@@b@import java.util.Properties;@b@@b@import org.apache.kafka.clients.producer.KafkaProducer;@b@import org.apache.kafka.clients.producer.ProducerRecord;@b@@b@public class KafkaProducerTest {@b@	@b@	// Topic@b@    private static final String topic = "kafkaTopic";@b@@b@	public static void main(String[] args)  throws  Exception{@b@		@b@		    Properties props = new Properties();@b@		    props.put("bootstrap.servers", "192.168.1.202:9092");@b@		    props.put("acks", "0");@b@//	        props.put("group.id", "test-consumer-group");@b@	        props.put("retries", "0");@b@	         //设置key和value序列化方式@b@	        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");@b@	        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");@b@@b@	        //生产者实例@b@	        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);@b@@b@	        int i = 1;@b@@b@	        // 发送业务消息@b@	        // 读取文件 读取内存数据库 读socket端口@b@	        while (true) {@b@	            producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));@b@	            System.out.println("key:" + i + " " + "value:" + i);@b@	            i++;@b@	            Thread.sleep(5000);@b@	        }  @b@		 @b@	}@b@	@b@@b@}
  • ◆ 相关内容