一、前言
基于apache的log4j的org.apache.logging.log4j.core.appender.AbstractAppender,进行自定义log4j的<Kafka>将日志通过发送消息队列后,并可以将消息便更的状态信息等
二、源码说明
1.KafkaAppender类
package com.xwood.kafka.log4j.appender;@b@@b@import com.xwood.kafka.exception.InitializationError;@b@import java.util.Properties;@b@import java.util.concurrent.locks.Lock;@b@import java.util.concurrent.locks.ReadWriteLock;@b@import java.util.concurrent.locks.ReentrantReadWriteLock;@b@import org.apache.logging.log4j.core.Filter;@b@import org.apache.logging.log4j.core.LogEvent;@b@import org.apache.logging.log4j.core.appender.AbstractAppender;@b@import org.apache.logging.log4j.core.config.plugins.Plugin;@b@import org.apache.logging.log4j.core.config.plugins.PluginAttribute;@b@import org.apache.logging.log4j.core.config.plugins.PluginElement;@b@import org.apache.logging.log4j.core.config.plugins.PluginFactory;@b@import org.apache.logging.log4j.core.util.Booleans;@b@import org.springframework.util.Assert;@b@import org.springframework.util.StringUtils;@b@@b@@Plugin(name="Kafka", category="Core", elementType="appender", printObject=true)@b@public class KafkaAppender extends AbstractAppender@b@{@b@ private static final long serialVersionUID = 4701714603421188915L;@b@ private static final String TOPIC_PREFIX = "xwood_log";@b@ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();@b@ private final Lock writeLock = this.rwLock.writeLock();@b@ private static volatile KafkaLog4jManager manager;@b@@b@ public KafkaAppender(String name, Filter filter, boolean ignoreExceptions)@b@ {@b@ super(name, filter, null, ignoreExceptions);@b@ }@b@@b@ public synchronized void append(LogEvent event) {@b@ this.writeLock.lock();@b@ try {@b@ getManager().writeEvent(event);@b@ } catch (Throwable t) {@b@ }@b@ finally {@b@ this.writeLock.unlock();@b@ }@b@ }@b@@b@ @PluginFactory@b@ public static KafkaAppender createAppender(@PluginAttribute("name") String name, @PluginElement("PropertyConfigs") PropertyConfig[] properties, @PluginAttribute("ignoreExceptions") String ignore, @PluginElement("Filter") Filter filter)@b@ {@b@ String projectId ="xwood_test";@b@ String domainId ="xwood.net.id";@b@ String domain = (!(StringUtils.hasText(domainId))) ? projectId : domainId;@b@ String pappName = "xwood_test"; @b@ boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);@b@ KafkaLog4jManagerData factoryData = new KafkaLog4jManagerData(domain + "_" + pappName, "xwood_log", domain + "_" + pappName, checkProperties(properties));@b@ KafkaAppender appender = new KafkaAppender(name, filter, ignoreExceptions);@b@@b@ if (manager != null) {@b@ manager.stop();@b@ }@b@@b@ manager = new KafkaLog4jManager(factoryData);@b@ return appender;@b@ }@b@@b@ private static Properties checkProperties(PropertyConfig[] properties) {@b@ Assert.notEmpty(properties, "kafka.bootstrap.servers is needed! " + exampleMsg());@b@ Properties kafkaProperties = new Properties();@b@ PropertyConfig[] arrayOfPropertyConfig = properties; int i = arrayOfPropertyConfig.length; for (int j = 0; j < i; ++j) { PropertyConfig config = arrayOfPropertyConfig[j];@b@ kafkaProperties.put(config.getKey(), config.getValue());@b@ }@b@ if ((!(kafkaProperties.containsKey("kafka.bootstrap.servers"))) || (!(StringUtils.hasText(kafkaProperties.getProperty("kafka.bootstrap.servers"))))) {@b@ throw new InitializationError("kafka.bootstrap.servers is needed! " + exampleMsg(), null);@b@ }@b@@b@ if (!(StringUtils.hasText(kafkaProperties.getProperty("kafka.producer.key.serializer"))))@b@ kafkaProperties.put("kafka.producer.key.serializer", "com.xwood.kafka.serialization.JsonSerializer");@b@@b@ if (!(StringUtils.hasText(kafkaProperties.getProperty("kafka.producer.value.serializer"))))@b@ kafkaProperties.put("kafka.producer.value.serializer", "com.xwood.kafka.serialization.JsonSerializer");@b@@b@ return kafkaProperties;@b@ }@b@@b@ private static String exampleMsg() {@b@ StringBuffer msg = new StringBuffer();@b@ msg.append("<Appenders>");@b@ msg.append("<Kafka>");@b@ msg.append("<property key=\"kafka.bootstrap.servers\" value=\"ip:port,ip:port\" />");@b@ msg.append("<property key=\"kafka.producer.client.id\" value=\"log4j-client\" />");@b@ msg.append("......");@b@ msg.append("</Kafka>");@b@ msg.append("</Appenders>");@b@ return msg.toString();@b@ }@b@@b@ public KafkaLog4jManager getManager() {@b@ return manager;@b@ }@b@}
2.KafkaLog4jManager类
package com.xwood.kafka.log4j.appender;@b@@b@import com.xwood.common.utils.PNetUtils;@b@import com.xwood.kafka.core.ProducerTemplate;@b@import com.xwood.kafka.producer.MessageRecord;@b@import com.xwood.log4j.appender.DefaultNoSqlObject;@b@import com.xwood.log4j.appender.NoSqlObject;@b@import java.util.Map;@b@import org.apache.logging.log4j.Level;@b@import org.apache.logging.log4j.Marker;@b@import org.apache.logging.log4j.ThreadContext.ContextStack;@b@import org.apache.logging.log4j.core.LogEvent;@b@import org.apache.logging.log4j.message.Message;@b@@b@public final class KafkaLog4jManager@b@{@b@ private ProducerTemplate<String, Object> producer;@b@ private KafkaLog4jManagerData factoryData;@b@@b@ public KafkaLog4jManager(KafkaLog4jManagerData factoryData)@b@ {@b@ this.factoryData = factoryData;@b@ this.producer = new ProducerTemplate(factoryData.getTopicPrefix(), factoryData.getProperties());@b@ this.producer.start();@b@ }@b@@b@ public void writeEvent(LogEvent event) {@b@ NoSqlObject value = assembleProducerRecord(event);@b@ this.producer.send(new MessageRecord(this.factoryData.getTopic(), value.unwrap()));@b@ }@b@@b@ private NoSqlObject<Map<String, Object>> assembleProducerRecord(LogEvent event) {@b@ NoSqlObject entity = createNoSqlObject();@b@@b@ entity.set("level", event.getLevel().name());@b@ entity.set("loggerName", event.getLoggerName());@b@ entity.set("instanceIp", PNetUtils.getLocalHost());@b@ entity.set("message", (event.getMessage() == null) ? null : event.getMessage().getFormattedMessage());@b@@b@ StackTraceElement source = event.getSource();@b@ if (source == null)@b@ entity.set("source", null);@b@ else {@b@ entity.set("source", convertStackTraceElement(source));@b@ }@b@@b@ Marker marker = event.getMarker();@b@ if (marker == null)@b@ entity.set("marker", null);@b@ else {@b@ entity.set("marker", buildMarkerEntity(marker));@b@ }@b@@b@ entity.set("threadName", event.getThreadName());@b@ entity.set("millis", Long.valueOf(event.getTimeMillis()));@b@@b@ Throwable thrown = event.getThrown();@b@ if (thrown == null) {@b@ entity.set("thrown", null);@b@ } else {@b@ NoSqlObject originalExceptionEntity = createNoSqlObject();@b@ NoSqlObject exceptionEntity = originalExceptionEntity;@b@ exceptionEntity.set("type", thrown.getClass().getName());@b@ exceptionEntity.set("message", thrown.getMessage());@b@ exceptionEntity.set("stackTrace", convertStackTrace(thrown.getStackTrace()));@b@ while (thrown.getCause() != null) {@b@ thrown = thrown.getCause();@b@ NoSqlObject causingExceptionEntity = createNoSqlObject();@b@ causingExceptionEntity.set("type", thrown.getClass().getName());@b@ causingExceptionEntity.set("message", thrown.getMessage());@b@ causingExceptionEntity.set("stackTrace", convertStackTrace(thrown.getStackTrace()));@b@ exceptionEntity.set("cause", causingExceptionEntity);@b@ exceptionEntity = causingExceptionEntity;@b@ }@b@@b@ entity.set("thrown", originalExceptionEntity);@b@ }@b@@b@ ThreadContext.ContextStack contextStack = event.getContextStack();@b@ if (contextStack == null)@b@ entity.set("contextStack", null);@b@ else@b@ entity.set("contextStack", contextStack.asList());@b@@b@ return entity;@b@ }@b@@b@ private NoSqlObject<Map<String, Object>> buildMarkerEntity(Marker marker) {@b@ NoSqlObject entity = createNoSqlObject();@b@ entity.set("name", marker.getName());@b@@b@ Marker[] parents = marker.getParents();@b@ if (parents != null)@b@ {@b@ NoSqlObject[] parentEntities = new NoSqlObject[parents.length];@b@ for (int i = 0; i < parents.length; ++i)@b@ parentEntities[i] = buildMarkerEntity(parents[i]);@b@@b@ entity.set("parents", parentEntities);@b@ }@b@ return entity;@b@ }@b@@b@ private NoSqlObject<Map<String, Object>>[] convertStackTrace(StackTraceElement[] stackTrace) {@b@ NoSqlObject[] stackTraceEntities = createNoSqlObjectList(stackTrace.length);@b@ for (int i = 0; i < stackTrace.length; ++i)@b@ stackTraceEntities[i] = convertStackTraceElement(stackTrace[i]);@b@@b@ return stackTraceEntities;@b@ }@b@@b@ private NoSqlObject<Map<String, Object>> convertStackTraceElement(StackTraceElement element) {@b@ NoSqlObject elementEntity = createNoSqlObject();@b@ elementEntity.set("className", element.getClassName());@b@ elementEntity.set("methodName", element.getMethodName());@b@ elementEntity.set("fileName", element.getFileName());@b@ elementEntity.set("lineNumber", Integer.valueOf(element.getLineNumber()));@b@ return elementEntity;@b@ }@b@@b@ public void stop() {@b@ this.producer.close();@b@ }@b@@b@ private NoSqlObject<Map<String, Object>> createNoSqlObject() {@b@ DefaultNoSqlObject obj = new DefaultNoSqlObject();@b@ obj.set("appName", this.factoryData.getAppName());@b@ return obj;@b@ }@b@@b@ private NoSqlObject<Map<String, Object>>[] createNoSqlObjectList(int length) {@b@ return new DefaultNoSqlObject[length];@b@ }@b@}
三、log4j配置
<Appenders>@b@ <Kafka>@b@ <property key="kafka.bootstrap.servers" value="ip:port,ip:port" />@b@ <property key="kafka.producer.client.id" value="log4j-client" />@b@ </Kafka>@b@</Appenders>