首页

定义KafkaUtils工具室实现apache的kafka代码创建生成消息消费者和生产者Consumer对象源码示例

标签:KafkaUtils,kafka,工具类,apache     发布时间:2018-03-10   

一、前言

这边通过实现KafkaUtils工具类对apachekafka直接在代码创建消息生产消费者,主要涉及创建消息消费者createConsumer、创建消息监听容器createMessageListener等操作。

二、示例代码

1.KafkaUtils工具类

import java.io.IOException;@b@import java.util.ArrayList;@b@import java.util.Collection;@b@import java.util.Iterator;@b@import java.util.LinkedHashMap;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Properties;@b@import java.util.Set;@b@import java.util.regex.Pattern;@b@@b@import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;@b@import org.apache.kafka.clients.consumer.ConsumerRecord;@b@import org.apache.kafka.clients.consumer.ConsumerRecords;@b@import org.apache.kafka.clients.consumer.OffsetAndMetadata;@b@import org.apache.kafka.common.TopicPartition;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@import org.springframework.core.io.Resource;@b@import org.springframework.util.Assert;@b@import org.springframework.util.ClassUtils;@b@import org.springframework.util.CollectionUtils;@b@import org.springframework.util.StringUtils;@b@@b@import com.alibaba.fastjson.JSONObject;@b@import com.woopa.kafka.consumer.MessageConsumer;@b@import com.woopa.kafka.listener.MessageListener; @b@@b@public class KafkaUtils {@b@  private final static Logger logger = LoggerFactory.getLogger(KafkaUtils.class);@b@  private final static String PROPERTY_PREFIX = "kafka.";@b@  private final static String PROPERTY_CONSUMER_PREFIX = "kafka.consumer.";@b@  private final static String PROPERTY_PRODUCER_PREFIX = "kafka.producer.";@b@  public final static String TOPIC_PATTERN_KEY = "topic.regex";@b@@b@  public static Collection<String> resolveTopics(Collection<String> topics, String topicPrefix) {@b@    Assert.notEmpty(topics, "Topics should not be empty!");@b@    List<String> list = new ArrayList<String>(topics.size());@b@    for (String topic : topics) {@b@      list.add(topicPrefix + topic);@b@    }@b@    return list;@b@  }@b@@b@  public static <K, V> Map<TopicPartition, OffsetAndMetadata> convertToOffsets(@b@      ConsumerRecords<K, V> records) {@b@    Iterator<ConsumerRecord<K, V>> iterator = records.iterator();@b@    Map<TopicPartition, OffsetAndMetadata> offsets =@b@        new LinkedHashMap<TopicPartition, OffsetAndMetadata>();@b@    ConsumerRecord<K, V> record = null;@b@    while (iterator.hasNext()) {@b@      record = iterator.next();@b@      offsets.put(new TopicPartition(record.topic(), record.partition()),@b@          new OffsetAndMetadata(record.offset() + 1));@b@    }@b@    return offsets;@b@  }@b@@b@  public static <K, V> Map<TopicPartition, OffsetAndMetadata> convertToOffsets(@b@      ConsumerRecord<K, V> record) {@b@    Map<TopicPartition, OffsetAndMetadata> offsets =@b@        new LinkedHashMap<TopicPartition, OffsetAndMetadata>();@b@    offsets.put(new TopicPartition(record.topic(), record.partition()),@b@        new OffsetAndMetadata(record.offset() + 1));@b@    return offsets;@b@  }@b@@b@  /**@b@   * Create a new consumer.@b@   * @b@   * @return@b@   */@b@  public static <K, V> MessageConsumer<K, V> createConsumer(String consumerClass, Resource resource,@b@      String topicPrefix, List<String> topics) {@b@    return createConsumer(consumerClass, retrieveConsumerProperties(resource), topicPrefix, topics);@b@  }@b@@b@  /**@b@   * Create a new consumer.@b@   * @b@   * @return@b@   */@b@  public static <K, V> MessageConsumer<K, V> createConsumer(String consumerClass,@b@      Properties properties, String topicPrefix) {@b@    return createConsumer(consumerClass, retrieveConsumerProperties(properties), topicPrefix, null);@b@  }@b@@b@  /**@b@   * Create a new consumer.@b@   * @b@   * @return@b@   */@b@  @SuppressWarnings("unchecked")@b@  public static <K, V> MessageConsumer<K, V> createConsumer(String consumerClass,@b@      Properties properties, String topicPrefix, List<String> topics) {@b@    logger.info("Create a consumer: consumerClass={},topicPrefix={},topics={},properties={}",@b@        consumerClass, topicPrefix, JSONObject.toJSONString(topics), properties);@b@    try {@b@      String topicRegex = retrieveTopicRegex(properties);@b@      Class<?> clazz = ClassUtils.forName(consumerClass, ClassUtils.getDefaultClassLoader());@b@      final MessageConsumer<K, V> consumer = (MessageConsumer<K, V>) clazz@b@          .getDeclaredConstructor(new Class[] {String.class, Properties.class})@b@          .newInstance(new Object[] {topicPrefix, properties});@b@      consumer.afterPropertiesSet();@b@      if (StringUtils.hasText(topicRegex)) {@b@        consumer.getNativeConsumer().subscribe(Pattern.compile(topicRegex),@b@            new DefaultConsumerRebalanceListener<K, V>(consumer));@b@      } else if (!CollectionUtils.isEmpty(topics)) {@b@        consumer.subscribe(topics, new DefaultConsumerRebalanceListener<K, V>(consumer));@b@      }@b@      return consumer;@b@    } catch (Throwable e) {@b@      throw new RuntimeException(e.getMessage(), e);@b@    }@b@  }@b@@b@  private static String retrieveTopicRegex(Properties properties) {@b@    String patternValue = properties.getProperty(TOPIC_PATTERN_KEY);@b@    if (StringUtils.hasLength(patternValue)) {@b@      properties.remove(TOPIC_PATTERN_KEY);@b@    }@b@    return patternValue;@b@  }@b@@b@  @SuppressWarnings("unchecked")@b@  public static <K, V> MessageListener<K, V> createMessageListener(String messageListenerClass) {@b@    Assert.hasText(messageListenerClass, "MessageListener should not be empty!");@b@    MessageListener<K, V> listener = null;@b@    try {@b@      Class<?> clazz = ClassUtils.forName(messageListenerClass, ClassUtils.getDefaultClassLoader());@b@      listener = (MessageListener<K, V>) clazz.newInstance();@b@    } catch (Throwable e) {@b@      throw new RuntimeException(e.getMessage(), e);@b@    }@b@    return listener;@b@  }@b@@b@  public static Properties retrieveConsumerProperties(Resource resource) {@b@    return retrieveProperties(convertToProperties(resource), PROPERTY_CONSUMER_PREFIX);@b@  }@b@@b@  public static Properties retrieveConsumerProperties(Properties properties) {@b@    return retrieveProperties(properties, PROPERTY_CONSUMER_PREFIX);@b@  }@b@@b@  public static Properties retrieveProducerProperties(Resource resource) {@b@    return retrieveProperties(convertToProperties(resource), PROPERTY_PRODUCER_PREFIX);@b@  }@b@@b@  public static Properties retrieveProducerProperties(Properties properties) {@b@    return retrieveProperties(properties, PROPERTY_PRODUCER_PREFIX);@b@  }@b@@b@  private static Properties retrieveProperties(Properties properties, String otherPrefix) {@b@    Properties kafkaPro = new Properties();@b@    Set<java.util.Map.Entry<Object, Object>> set = properties.entrySet();@b@    String key = null;@b@    for (java.util.Map.Entry<Object, Object> entry : set) {@b@      key = (String) entry.getKey();@b@      if (StringUtils.hasText(key)) {@b@        if (key.startsWith(otherPrefix)) {@b@          kafkaPro.put(key.substring(otherPrefix.length()), entry.getValue());@b@        } else if (key.startsWith(PROPERTY_PREFIX, 0)) {@b@          if ((PROPERTY_PRODUCER_PREFIX.equals(otherPrefix)@b@              && !key.startsWith(PROPERTY_CONSUMER_PREFIX))@b@              || (PROPERTY_CONSUMER_PREFIX.equals(otherPrefix)@b@                  && !key.startsWith(PROPERTY_PRODUCER_PREFIX))) {@b@            kafkaPro.put(key.substring(PROPERTY_PREFIX.length()), entry.getValue());@b@          }@b@        }@b@      }@b@    }@b@    return kafkaPro;@b@  }@b@@b@  public static Properties convertToProperties(Resource resource) {@b@    try {@b@      Properties properties = new Properties();@b@      properties.load(resource.getInputStream());@b@      return properties;@b@    } catch (IOException e) { @b@    }@b@  }@b@@b@  private static class DefaultConsumerRebalanceListener<K, V> implements ConsumerRebalanceListener {@b@    private MessageConsumer<K, V> consumer;@b@@b@    public DefaultConsumerRebalanceListener(MessageConsumer<K, V> consumer) {@b@      this.consumer = consumer;@b@    }@b@@b@    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {@b@      consumer.commitSync();@b@      logger.info("Rebalance occured, commit offset manually!");@b@    }@b@@b@    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {@b@      logger.info("Partitions are assigned");@b@    }@b@  }@b@}

2.MessageListener监听接口

import org.apache.kafka.clients.consumer.ConsumerRecords;@b@@b@public interface MessageListener<K, V> {@b@	@b@	void onMessage(ConsumerRecords<K, V> data);@b@	@b@	void onMessage(ConsumerRecords<K, V> data, CommitCallback<K, V> callback);@b@}

3.MessageConsumer消费接口

import java.util.Collection;@b@import java.util.Map;@b@@b@import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;@b@import org.apache.kafka.clients.consumer.ConsumerRecords;@b@import org.apache.kafka.clients.consumer.KafkaConsumer;@b@import org.apache.kafka.clients.consumer.OffsetAndMetadata;@b@import org.apache.kafka.common.TopicPartition;@b@import org.springframework.beans.factory.InitializingBean;@b@@b@@b@@b@public interface MessageConsumer<K, V> extends InitializingBean {@b@@b@	KafkaConsumer<K, V> getNativeConsumer();@b@@b@	ConsumerRecords<K, V> poll();@b@@b@	ConsumerRecords<K, V> poll(long timeout);@b@@b@	void seek(TopicPartition topicPartition, long offset);@b@@b@	void subscribe(Collection<String> topics);@b@@b@	void subscribe(Collection<String> topics, ConsumerRebalanceListener listener);@b@@b@	void commitSync();@b@@b@	void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets);@b@@b@	void close();@b@}