一、前言
这边通过实现KafkaUtils工具类对apache的kafka直接在代码创建消息生产消费者,主要涉及创建消息消费者createConsumer、创建消息监听容器createMessageListener等操作。
二、示例代码
1.KafkaUtils工具类
import java.io.IOException;@b@import java.util.ArrayList;@b@import java.util.Collection;@b@import java.util.Iterator;@b@import java.util.LinkedHashMap;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Properties;@b@import java.util.Set;@b@import java.util.regex.Pattern;@b@@b@import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;@b@import org.apache.kafka.clients.consumer.ConsumerRecord;@b@import org.apache.kafka.clients.consumer.ConsumerRecords;@b@import org.apache.kafka.clients.consumer.OffsetAndMetadata;@b@import org.apache.kafka.common.TopicPartition;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@import org.springframework.core.io.Resource;@b@import org.springframework.util.Assert;@b@import org.springframework.util.ClassUtils;@b@import org.springframework.util.CollectionUtils;@b@import org.springframework.util.StringUtils;@b@@b@import com.alibaba.fastjson.JSONObject;@b@import com.woopa.kafka.consumer.MessageConsumer;@b@import com.woopa.kafka.listener.MessageListener; @b@@b@public class KafkaUtils {@b@ private final static Logger logger = LoggerFactory.getLogger(KafkaUtils.class);@b@ private final static String PROPERTY_PREFIX = "kafka.";@b@ private final static String PROPERTY_CONSUMER_PREFIX = "kafka.consumer.";@b@ private final static String PROPERTY_PRODUCER_PREFIX = "kafka.producer.";@b@ public final static String TOPIC_PATTERN_KEY = "topic.regex";@b@@b@ public static Collection<String> resolveTopics(Collection<String> topics, String topicPrefix) {@b@ Assert.notEmpty(topics, "Topics should not be empty!");@b@ List<String> list = new ArrayList<String>(topics.size());@b@ for (String topic : topics) {@b@ list.add(topicPrefix + topic);@b@ }@b@ return list;@b@ }@b@@b@ public static <K, V> Map<TopicPartition, OffsetAndMetadata> convertToOffsets(@b@ ConsumerRecords<K, V> records) {@b@ Iterator<ConsumerRecord<K, V>> iterator = records.iterator();@b@ Map<TopicPartition, OffsetAndMetadata> offsets =@b@ new LinkedHashMap<TopicPartition, OffsetAndMetadata>();@b@ ConsumerRecord<K, V> record = null;@b@ while (iterator.hasNext()) {@b@ record = iterator.next();@b@ offsets.put(new TopicPartition(record.topic(), record.partition()),@b@ new OffsetAndMetadata(record.offset() + 1));@b@ }@b@ return offsets;@b@ }@b@@b@ public static <K, V> Map<TopicPartition, OffsetAndMetadata> convertToOffsets(@b@ ConsumerRecord<K, V> record) {@b@ Map<TopicPartition, OffsetAndMetadata> offsets =@b@ new LinkedHashMap<TopicPartition, OffsetAndMetadata>();@b@ offsets.put(new TopicPartition(record.topic(), record.partition()),@b@ new OffsetAndMetadata(record.offset() + 1));@b@ return offsets;@b@ }@b@@b@ /**@b@ * Create a new consumer.@b@ * @b@ * @return@b@ */@b@ public static <K, V> MessageConsumer<K, V> createConsumer(String consumerClass, Resource resource,@b@ String topicPrefix, List<String> topics) {@b@ return createConsumer(consumerClass, retrieveConsumerProperties(resource), topicPrefix, topics);@b@ }@b@@b@ /**@b@ * Create a new consumer.@b@ * @b@ * @return@b@ */@b@ public static <K, V> MessageConsumer<K, V> createConsumer(String consumerClass,@b@ Properties properties, String topicPrefix) {@b@ return createConsumer(consumerClass, retrieveConsumerProperties(properties), topicPrefix, null);@b@ }@b@@b@ /**@b@ * Create a new consumer.@b@ * @b@ * @return@b@ */@b@ @SuppressWarnings("unchecked")@b@ public static <K, V> MessageConsumer<K, V> createConsumer(String consumerClass,@b@ Properties properties, String topicPrefix, List<String> topics) {@b@ logger.info("Create a consumer: consumerClass={},topicPrefix={},topics={},properties={}",@b@ consumerClass, topicPrefix, JSONObject.toJSONString(topics), properties);@b@ try {@b@ String topicRegex = retrieveTopicRegex(properties);@b@ Class<?> clazz = ClassUtils.forName(consumerClass, ClassUtils.getDefaultClassLoader());@b@ final MessageConsumer<K, V> consumer = (MessageConsumer<K, V>) clazz@b@ .getDeclaredConstructor(new Class[] {String.class, Properties.class})@b@ .newInstance(new Object[] {topicPrefix, properties});@b@ consumer.afterPropertiesSet();@b@ if (StringUtils.hasText(topicRegex)) {@b@ consumer.getNativeConsumer().subscribe(Pattern.compile(topicRegex),@b@ new DefaultConsumerRebalanceListener<K, V>(consumer));@b@ } else if (!CollectionUtils.isEmpty(topics)) {@b@ consumer.subscribe(topics, new DefaultConsumerRebalanceListener<K, V>(consumer));@b@ }@b@ return consumer;@b@ } catch (Throwable e) {@b@ throw new RuntimeException(e.getMessage(), e);@b@ }@b@ }@b@@b@ private static String retrieveTopicRegex(Properties properties) {@b@ String patternValue = properties.getProperty(TOPIC_PATTERN_KEY);@b@ if (StringUtils.hasLength(patternValue)) {@b@ properties.remove(TOPIC_PATTERN_KEY);@b@ }@b@ return patternValue;@b@ }@b@@b@ @SuppressWarnings("unchecked")@b@ public static <K, V> MessageListener<K, V> createMessageListener(String messageListenerClass) {@b@ Assert.hasText(messageListenerClass, "MessageListener should not be empty!");@b@ MessageListener<K, V> listener = null;@b@ try {@b@ Class<?> clazz = ClassUtils.forName(messageListenerClass, ClassUtils.getDefaultClassLoader());@b@ listener = (MessageListener<K, V>) clazz.newInstance();@b@ } catch (Throwable e) {@b@ throw new RuntimeException(e.getMessage(), e);@b@ }@b@ return listener;@b@ }@b@@b@ public static Properties retrieveConsumerProperties(Resource resource) {@b@ return retrieveProperties(convertToProperties(resource), PROPERTY_CONSUMER_PREFIX);@b@ }@b@@b@ public static Properties retrieveConsumerProperties(Properties properties) {@b@ return retrieveProperties(properties, PROPERTY_CONSUMER_PREFIX);@b@ }@b@@b@ public static Properties retrieveProducerProperties(Resource resource) {@b@ return retrieveProperties(convertToProperties(resource), PROPERTY_PRODUCER_PREFIX);@b@ }@b@@b@ public static Properties retrieveProducerProperties(Properties properties) {@b@ return retrieveProperties(properties, PROPERTY_PRODUCER_PREFIX);@b@ }@b@@b@ private static Properties retrieveProperties(Properties properties, String otherPrefix) {@b@ Properties kafkaPro = new Properties();@b@ Set<java.util.Map.Entry<Object, Object>> set = properties.entrySet();@b@ String key = null;@b@ for (java.util.Map.Entry<Object, Object> entry : set) {@b@ key = (String) entry.getKey();@b@ if (StringUtils.hasText(key)) {@b@ if (key.startsWith(otherPrefix)) {@b@ kafkaPro.put(key.substring(otherPrefix.length()), entry.getValue());@b@ } else if (key.startsWith(PROPERTY_PREFIX, 0)) {@b@ if ((PROPERTY_PRODUCER_PREFIX.equals(otherPrefix)@b@ && !key.startsWith(PROPERTY_CONSUMER_PREFIX))@b@ || (PROPERTY_CONSUMER_PREFIX.equals(otherPrefix)@b@ && !key.startsWith(PROPERTY_PRODUCER_PREFIX))) {@b@ kafkaPro.put(key.substring(PROPERTY_PREFIX.length()), entry.getValue());@b@ }@b@ }@b@ }@b@ }@b@ return kafkaPro;@b@ }@b@@b@ public static Properties convertToProperties(Resource resource) {@b@ try {@b@ Properties properties = new Properties();@b@ properties.load(resource.getInputStream());@b@ return properties;@b@ } catch (IOException e) { @b@ }@b@ }@b@@b@ private static class DefaultConsumerRebalanceListener<K, V> implements ConsumerRebalanceListener {@b@ private MessageConsumer<K, V> consumer;@b@@b@ public DefaultConsumerRebalanceListener(MessageConsumer<K, V> consumer) {@b@ this.consumer = consumer;@b@ }@b@@b@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {@b@ consumer.commitSync();@b@ logger.info("Rebalance occured, commit offset manually!");@b@ }@b@@b@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {@b@ logger.info("Partitions are assigned");@b@ }@b@ }@b@}
2.MessageListener监听接口
import org.apache.kafka.clients.consumer.ConsumerRecords;@b@@b@public interface MessageListener<K, V> {@b@ @b@ void onMessage(ConsumerRecords<K, V> data);@b@ @b@ void onMessage(ConsumerRecords<K, V> data, CommitCallback<K, V> callback);@b@}
3.MessageConsumer消费接口
import java.util.Collection;@b@import java.util.Map;@b@@b@import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;@b@import org.apache.kafka.clients.consumer.ConsumerRecords;@b@import org.apache.kafka.clients.consumer.KafkaConsumer;@b@import org.apache.kafka.clients.consumer.OffsetAndMetadata;@b@import org.apache.kafka.common.TopicPartition;@b@import org.springframework.beans.factory.InitializingBean;@b@@b@@b@@b@public interface MessageConsumer<K, V> extends InitializingBean {@b@@b@ KafkaConsumer<K, V> getNativeConsumer();@b@@b@ ConsumerRecords<K, V> poll();@b@@b@ ConsumerRecords<K, V> poll(long timeout);@b@@b@ void seek(TopicPartition topicPartition, long offset);@b@@b@ void subscribe(Collection<String> topics);@b@@b@ void subscribe(Collection<String> topics, ConsumerRebalanceListener listener);@b@@b@ void commitSync();@b@@b@ void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets);@b@@b@ void close();@b@}