一、前言
通过activemq包(3.2)中关于org.activemq.broker.impl.DefaultBroker的分析,了解实现消息核心类包括事务管理TransactionManager、容量监控DelegateCapacityMonitor、消息监听MessageContainerManager、安全适配及相关管理等。
二、源码说明
1.定义Broker、BrokerAdmin接口
package org.activemq.broker;@b@@b@import java.io.File;@b@import java.util.Hashtable;@b@import java.util.Map;@b@import javax.jms.JMSException;@b@import javax.naming.Context;@b@import javax.transaction.xa.XAException;@b@import org.activemq.capacity.CapacityMonitor;@b@import org.activemq.message.ActiveMQMessage;@b@import org.activemq.message.ActiveMQXid;@b@import org.activemq.message.BrokerInfo;@b@import org.activemq.message.ConnectionInfo;@b@import org.activemq.message.ConsumerInfo;@b@import org.activemq.message.MessageAck;@b@import org.activemq.message.ProducerInfo;@b@import org.activemq.security.SecurityAdapter;@b@import org.activemq.service.DeadLetterPolicy;@b@import org.activemq.service.MessageContainerManager;@b@import org.activemq.service.RedeliveryPolicy;@b@import org.activemq.service.Service;@b@import org.activemq.store.PersistenceAdapter;@b@@b@public abstract interface Broker extends Service, CapacityMonitor@b@{@b@ public abstract BrokerAdmin getBrokerAdmin();@b@@b@ public abstract BrokerInfo getBrokerInfo();@b@@b@ public abstract void addClient(BrokerClient paramBrokerClient, ConnectionInfo paramConnectionInfo)@b@ throws JMSException;@b@@b@ public abstract void removeClient(BrokerClient paramBrokerClient, ConnectionInfo paramConnectionInfo)@b@ throws JMSException;@b@@b@ public abstract void addMessageProducer(BrokerClient paramBrokerClient, ProducerInfo paramProducerInfo)@b@ throws JMSException;@b@@b@ public abstract void removeMessageProducer(BrokerClient paramBrokerClient, ProducerInfo paramProducerInfo)@b@ throws JMSException;@b@@b@ public abstract void addMessageConsumer(BrokerClient paramBrokerClient, ConsumerInfo paramConsumerInfo)@b@ throws JMSException;@b@@b@ public abstract void removeMessageConsumer(BrokerClient paramBrokerClient, ConsumerInfo paramConsumerInfo)@b@ throws JMSException;@b@@b@ public abstract void sendMessage(BrokerClient paramBrokerClient, ActiveMQMessage paramActiveMQMessage)@b@ throws JMSException;@b@@b@ public abstract void acknowledgeMessage(BrokerClient paramBrokerClient, MessageAck paramMessageAck)@b@ throws JMSException;@b@@b@ public abstract ActiveMQXid[] getPreparedTransactions(BrokerClient paramBrokerClient)@b@ throws XAException;@b@@b@ public abstract void deleteSubscription(String paramString1, String paramString2)@b@ throws JMSException;@b@@b@ public abstract void startTransaction(BrokerClient paramBrokerClient, String paramString)@b@ throws JMSException;@b@@b@ public abstract void commitTransaction(BrokerClient paramBrokerClient, String paramString)@b@ throws JMSException;@b@@b@ public abstract void rollbackTransaction(BrokerClient paramBrokerClient, String paramString)@b@ throws JMSException;@b@@b@ public abstract void startTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid)@b@ throws XAException;@b@@b@ public abstract int prepareTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid)@b@ throws XAException;@b@@b@ public abstract void rollbackTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid)@b@ throws XAException;@b@@b@ public abstract void commitTransaction(BrokerClient paramBrokerClient, ActiveMQXid paramActiveMQXid, boolean paramBoolean)@b@ throws XAException;@b@@b@ public abstract File getTempDir();@b@@b@ public abstract String getBrokerName();@b@@b@ public abstract String getBrokerClusterName();@b@@b@ public abstract PersistenceAdapter getPersistenceAdapter();@b@@b@ public abstract void setPersistenceAdapter(PersistenceAdapter paramPersistenceAdapter);@b@@b@ public abstract Map getContainerManagerMap();@b@@b@ public abstract Context getDestinationContext(Hashtable paramHashtable);@b@@b@ public abstract void addConsumerInfoListener(ConsumerInfoListener paramConsumerInfoListener);@b@@b@ public abstract void removeConsumerInfoListener(ConsumerInfoListener paramConsumerInfoListener);@b@@b@ public abstract MessageContainerManager getPersistentTopicContainerManager();@b@@b@ public abstract MessageContainerManager getTransientTopicContainerManager();@b@@b@ public abstract MessageContainerManager getPersistentQueueContainerManager();@b@@b@ public abstract MessageContainerManager getTransientQueueContainerManager();@b@@b@ public abstract SecurityAdapter getSecurityAdapter();@b@@b@ public abstract void setSecurityAdapter(SecurityAdapter paramSecurityAdapter);@b@@b@ public abstract RedeliveryPolicy getRedeliveryPolicy();@b@@b@ public abstract void setRedeliveryPolicy(RedeliveryPolicy paramRedeliveryPolicy);@b@@b@ public abstract DeadLetterPolicy getDeadLetterPolicy();@b@@b@ public abstract void setDeadLetterPolicy(DeadLetterPolicy paramDeadLetterPolicy);@b@@b@ public abstract void sendToDeadLetterQueue(String paramString, ActiveMQMessage paramActiveMQMessage)@b@ throws JMSException;@b@}
package org.activemq.broker;@b@@b@import javax.jms.JMSException;@b@import org.activemq.message.ActiveMQDestination;@b@import org.activemq.service.MessageContainerAdmin;@b@@b@public abstract interface BrokerAdmin@b@{@b@ public abstract void createMessageContainer(ActiveMQDestination paramActiveMQDestination)@b@ throws JMSException;@b@@b@ public abstract void destoryMessageContainer(ActiveMQDestination paramActiveMQDestination)@b@ throws JMSException;@b@@b@ public abstract MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination paramActiveMQDestination)@b@ throws JMSException;@b@@b@ public abstract MessageContainerAdmin[] listMessageContainerAdmin()@b@ throws JMSException;@b@}
2.DefaultBroker
package org.activemq.broker.impl;@b@@b@import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;@b@import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;@b@import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;@b@import java.io.File;@b@import java.io.IOException;@b@import java.util.ArrayList;@b@import java.util.Collection;@b@import java.util.Hashtable;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import java.util.Set;@b@import javax.jms.JMSException;@b@import javax.naming.Context;@b@import javax.transaction.xa.XAException;@b@import org.activemq.broker.Broker;@b@import org.activemq.broker.BrokerAdmin;@b@import org.activemq.broker.BrokerClient;@b@import org.activemq.broker.ConsumerInfoListener;@b@import org.activemq.capacity.DelegateCapacityMonitor;@b@import org.activemq.io.util.MemoryBoundedObjectManager;@b@import org.activemq.io.util.MemoryBoundedQueueManager;@b@import org.activemq.jndi.ReadOnlyContext;@b@import org.activemq.message.ActiveMQDestination;@b@import org.activemq.message.ActiveMQMessage;@b@import org.activemq.message.ActiveMQXid;@b@import org.activemq.message.BrokerInfo;@b@import org.activemq.message.ConnectionInfo;@b@import org.activemq.message.ConsumerInfo;@b@import org.activemq.message.MessageAck;@b@import org.activemq.message.ProducerInfo;@b@import org.activemq.security.SecurityAdapter;@b@import org.activemq.service.DeadLetterPolicy;@b@import org.activemq.service.MessageContainerAdmin;@b@import org.activemq.service.MessageContainerManager;@b@import org.activemq.service.RedeliveryPolicy;@b@import org.activemq.service.Transaction;@b@import org.activemq.service.TransactionManager;@b@import org.activemq.service.boundedvm.DurableQueueBoundedMessageManager;@b@import org.activemq.service.boundedvm.TransientQueueBoundedMessageManager;@b@import org.activemq.service.boundedvm.TransientTopicBoundedMessageManager;@b@import org.activemq.service.impl.DurableTopicMessageContainerManager;@b@import org.activemq.store.PersistenceAdapter;@b@import org.activemq.store.PersistenceAdapterFactory;@b@import org.activemq.store.TransactionStore;@b@import org.activemq.store.vm.VMPersistenceAdapter;@b@import org.activemq.store.vm.VMTransactionManager;@b@import org.activemq.util.Callback;@b@import org.activemq.util.ExceptionTemplate;@b@import org.activemq.util.JMSExceptionHelper;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@public class DefaultBroker extends DelegateCapacityMonitor@b@ implements Broker, BrokerAdmin@b@{@b@ private static final Log log = LogFactory.getLog(DefaultBroker.class);@b@ protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";@b@ protected static final String PERSISTENCE_ADAPTER_FACTORY = "activemq.persistenceAdapterFactory";@b@ protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = { File.class };@b@ private static final long DEFAULT_MAX_MEMORY_USAGE = 20971520L;@b@ private PersistenceAdapter persistenceAdapter;@b@ private TransactionManager transactionManager;@b@ private MessageContainerManager[] containerManagers;@b@ private File tempDir;@b@ private MemoryBoundedObjectManager memoryManager;@b@ private MemoryBoundedQueueManager queueManager;@b@ private TransactionStore preparedTransactionStore;@b@ private Map containerManagerMap;@b@ private CopyOnWriteArrayList consumerInfoListeners;@b@ private MessageContainerManager persistentTopicMCM;@b@ private MessageContainerManager transientTopicMCM;@b@ private TransientQueueBoundedMessageManager transientQueueMCM;@b@ private DurableQueueBoundedMessageManager persistentQueueMCM;@b@ private SecurityAdapter securityAdapter;@b@ private RedeliveryPolicy redeliveryPolicy;@b@ private DeadLetterPolicy deadLetterPolicy;@b@ private AdvisorySupport advisory;@b@ private Map messageConsumers;@b@ private BrokerInfo brokerInfo;@b@ private SynchronizedBoolean started;@b@ private BrokerContainerImpl brokerContainer;@b@@b@ public DefaultBroker(String brokerName, String brokerClusterName, MemoryBoundedObjectManager memoryManager)@b@ {@b@ this.messageConsumers = new ConcurrentHashMap();@b@@b@ this.started = new SynchronizedBoolean(false);@b@@b@ this.brokerInfo = new LocalBrokerInfo(this);@b@ this.brokerInfo.setBrokerName(brokerName);@b@ this.brokerInfo.setClusterName(brokerClusterName);@b@ this.memoryManager = memoryManager;@b@ this.queueManager = new MemoryBoundedQueueManager(memoryManager);@b@ setDelegate(memoryManager);@b@ this.containerManagerMap = new ConcurrentHashMap();@b@ this.consumerInfoListeners = new CopyOnWriteArrayList();@b@ this.advisory = new AdvisorySupport(this);@b@ }@b@@b@ public DefaultBroker(String brokerName, MemoryBoundedObjectManager memoryManager) {@b@ this(brokerName, "default", memoryManager);@b@ }@b@@b@ public DefaultBroker(String brokerName, String cluserName) {@b@ this(brokerName, cluserName, new MemoryBoundedObjectManager("Broker Memory Manager", 20971520L));@b@ }@b@@b@ public DefaultBroker(String brokerName) {@b@ this(brokerName, new MemoryBoundedObjectManager("Broker Memory Manager", 20971520L));@b@ }@b@@b@ public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) {@b@ this(brokerName, brokerClusterName, new MemoryBoundedObjectManager("Broker Memory Manager", 20971520L));@b@ this.persistenceAdapter = persistenceAdapter;@b@ }@b@@b@ public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {@b@ this(brokerName);@b@ this.persistenceAdapter = persistenceAdapter;@b@ }@b@@b@ public boolean isStarted() {@b@ return this.started.get();@b@ }@b@@b@ public void start()@b@ throws JMSException@b@ {@b@ if (this.started.commit(false, true)) {@b@ if (this.redeliveryPolicy == null)@b@ this.redeliveryPolicy = new RedeliveryPolicy();@b@@b@ if (this.deadLetterPolicy == null)@b@ this.deadLetterPolicy = new DeadLetterPolicy(this);@b@@b@ if (this.persistenceAdapter == null)@b@ this.persistenceAdapter = createPersistenceAdapter();@b@@b@ this.persistenceAdapter.start();@b@@b@ if (this.transactionManager == null) {@b@ this.preparedTransactionStore = this.persistenceAdapter.createTransactionStore();@b@ this.transactionManager = new VMTransactionManager(this, this.preparedTransactionStore);@b@ }@b@@b@ if (this.containerManagerMap.isEmpty())@b@ makeDefaultContainerManagers();@b@@b@ getContainerManagers();@b@@b@ for (int i = 0; i < this.containerManagers.length; ++i) {@b@ this.containerManagers[i].setDeadLetterPolicy(this.deadLetterPolicy);@b@ this.containerManagers[i].start();@b@ }@b@ this.transactionManager.start();@b@ }@b@ }@b@@b@ public void stop()@b@ throws JMSException@b@ {@b@ if (this.started.commit(true, false)) {@b@ int i;@b@ ExceptionTemplate template = new ExceptionTemplate();@b@@b@ if (this.containerManagers != null)@b@ for (i = 0; i < this.containerManagers.length; ++i) {@b@ MessageContainerManager containerManager = this.containerManagers[i];@b@ template.run(new Callback(this, containerManager) { private final MessageContainerManager val$containerManager;@b@ private final DefaultBroker this$0;@b@@b@ public void execute() throws Throwable { this.val$containerManager.stop();@b@ }@b@ });@b@ }@b@@b@ if (this.transactionManager != null)@b@ template.run(new Callback(this) {@b@ private final DefaultBroker this$0;@b@@b@ public void execute() throws Throwable { DefaultBroker.access$000(this.this$0).stop();@b@ }@b@@b@ });@b@@b@ template.run(new Callback(this) {@b@ private final DefaultBroker this$0;@b@@b@ public void execute() throws Throwable { DefaultBroker.access$100(this.this$0).stop();@b@ }@b@@b@ });@b@ template.throwJMSException();@b@ }@b@ }@b@@b@ public void addClient(BrokerClient client, ConnectionInfo info)@b@ throws JMSException@b@ {@b@ if (this.securityAdapter != null)@b@ this.securityAdapter.authorizeConnection(client, info);@b@@b@ this.advisory.addConnection(client, info);@b@ }@b@@b@ public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException {@b@ if (this.transactionManager != null)@b@ this.transactionManager.cleanUpClient(client);@b@@b@ this.advisory.removeConnection(client, info);@b@ }@b@@b@ public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {@b@ if (this.securityAdapter != null)@b@ this.securityAdapter.authorizeProducer(client, info);@b@@b@ this.advisory.addProducer(client, info);@b@ }@b@@b@ public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {@b@ this.advisory.removeProducer(client, info);@b@ }@b@@b@ public void addMessageConsumer(BrokerClient client, ConsumerInfo info)@b@ throws JMSException@b@ {@b@ validateConsumer(info);@b@ if (this.securityAdapter != null)@b@ this.securityAdapter.authorizeConsumer(client, info);@b@@b@ this.advisory.addAdvisory(client, info);@b@ MessageContainerManager[] array = getContainerManagers();@b@ for (int i = 0; i < array.length; ++i)@b@ array[i].addMessageConsumer(client, info);@b@@b@ fireConsumerInfo(client, info);@b@ this.messageConsumers.put(info, client);@b@ }@b@@b@ public void removeMessageConsumer(BrokerClient client, ConsumerInfo info)@b@ throws JMSException@b@ {@b@ validateConsumer(info);@b@ this.advisory.removeAdvisory(client, info);@b@ for (int i = 0; i < this.containerManagers.length; ++i)@b@ this.containerManagers[i].removeMessageConsumer(client, info);@b@@b@ fireConsumerInfo(client, info);@b@ this.messageConsumers.remove(info);@b@ }@b@@b@ public void sendMessage(BrokerClient client, ActiveMQMessage message)@b@ throws JMSException@b@ {@b@ checkValid();@b@ ActiveMQDestination destination = message.getJMSActiveMQDestination();@b@ if (destination == null)@b@ throw new JMSException("No destination specified for the Message");@b@@b@ if ((message.getJMSMessageID() == null) && (!(destination.isAdvisory())))@b@ throw new JMSException("No messageID specified for the Message");@b@@b@ associateTransaction(message);@b@ try {@b@ boolean first;@b@ Iterator iter;@b@ if (destination.isComposite()) {@b@ first = true;@b@ for (iter = destination.getChildDestinations().iterator(); iter.hasNext(); ) {@b@ ActiveMQDestination childDestination = (ActiveMQDestination)iter.next();@b@@b@ if (first) {@b@ first = false;@b@ }@b@ else@b@ message = message.shallowCopy();@b@@b@ message.setJMSDestination(childDestination);@b@ doMessageSend(client, message);@b@ }@b@ }@b@ else {@b@ if ((destination.isTempDestinationAdvisory()) && (!(client.isBrokerConnection())))@b@ this.advisory.processTempDestinationAdvisory(client, message);@b@@b@ doMessageSend(client, message);@b@ }@b@ }@b@ finally {@b@ disAssociateTransaction();@b@ }@b@ }@b@@b@ public void acknowledgeMessage(BrokerClient client, MessageAck ack)@b@ throws JMSException@b@ {@b@ int i;@b@ associateTransaction(ack);@b@ try {@b@ for (i = 0; i < this.containerManagers.length; ++i)@b@ this.containerManagers[i].acknowledgeMessage(client, ack);@b@ }@b@ finally {@b@ disAssociateTransaction();@b@ }@b@ }@b@@b@ public void deleteSubscription(String clientId, String subscriberName) throws JMSException@b@ {@b@ for (int i = 0; i < this.containerManagers.length; ++i)@b@ this.containerManagers[i].deleteSubscription(clientId, subscriberName);@b@ }@b@@b@ public void startTransaction(BrokerClient client, String transactionId)@b@ throws JMSException@b@ {@b@ this.transactionManager.createLocalTransaction(client, transactionId); }@b@@b@ public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {@b@ Transaction transaction;@b@ try {@b@ transaction = this.transactionManager.getLocalTransaction(transactionId);@b@ transaction.commit(true);@b@ }@b@ catch (XAException e)@b@ {@b@ throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@ }@b@ }@b@@b@ public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException@b@ {@b@ Transaction transaction;@b@ try@b@ {@b@ transaction = this.transactionManager.getLocalTransaction(transactionId);@b@ transaction.rollback();@b@ }@b@ catch (XAException e)@b@ {@b@ throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@ }@b@ }@b@@b@ public void startTransaction(BrokerClient client, ActiveMQXid xid)@b@ throws XAException@b@ {@b@ this.transactionManager.createXATransaction(client, xid);@b@ }@b@@b@ public int prepareTransaction(BrokerClient client, ActiveMQXid xid)@b@ throws XAException@b@ {@b@ Transaction transaction = this.transactionManager.getXATransaction(xid);@b@ return transaction.prepare();@b@ }@b@@b@ public void rollbackTransaction(BrokerClient client, ActiveMQXid xid)@b@ throws XAException@b@ {@b@ Transaction transaction = this.transactionManager.getXATransaction(xid);@b@ transaction.rollback();@b@ }@b@@b@ public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase)@b@ throws XAException@b@ {@b@ Transaction transaction = this.transactionManager.getXATransaction(xid);@b@ transaction.commit(onePhase);@b@ }@b@@b@ public ActiveMQXid[] getPreparedTransactions(BrokerClient client)@b@ throws XAException@b@ {@b@ return this.transactionManager.getPreparedXATransactions();@b@ }@b@@b@ public File getTempDir()@b@ {@b@ if (this.tempDir == null) {@b@ String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");@b@ this.tempDir = new File(dirName);@b@ }@b@ return this.tempDir;@b@ }@b@@b@ public String getBrokerName() {@b@ return this.brokerInfo.getBrokerName();@b@ }@b@@b@ public String getBrokerClusterName()@b@ {@b@ return this.brokerInfo.getClusterName();@b@ }@b@@b@ public void setTempDir(File tempDir)@b@ {@b@ this.tempDir = tempDir;@b@ }@b@@b@ public MessageContainerManager[] getContainerManagers() {@b@ if (this.containerManagers == null)@b@ this.containerManagers = createContainerManagers();@b@@b@ return this.containerManagers;@b@ }@b@@b@ public Map getContainerManagerMap() {@b@ return this.containerManagerMap;@b@ }@b@@b@ public void setContainerManagerMap(Map containerManagerMap) {@b@ this.containerManagerMap = containerManagerMap;@b@ this.containerManagers = null;@b@ }@b@@b@ public PersistenceAdapter getPersistenceAdapter() {@b@ return this.persistenceAdapter;@b@ }@b@@b@ public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {@b@ this.persistenceAdapter = persistenceAdapter;@b@ }@b@@b@ public TransactionManager getTransactionManager() {@b@ return this.transactionManager;@b@ }@b@@b@ public void setTransactionManager(TransactionManager transactionManager) {@b@ this.transactionManager = transactionManager;@b@ }@b@@b@ public SecurityAdapter getSecurityAdapter() {@b@ return this.securityAdapter;@b@ }@b@@b@ public void setSecurityAdapter(SecurityAdapter securityAdapter) {@b@ this.securityAdapter = securityAdapter;@b@ }@b@@b@ public RedeliveryPolicy getRedeliveryPolicy() {@b@ return this.redeliveryPolicy;@b@ }@b@@b@ public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {@b@ this.redeliveryPolicy = redeliveryPolicy;@b@ }@b@@b@ public TransactionStore getPreparedTransactionStore() {@b@ return this.preparedTransactionStore;@b@ }@b@@b@ public void setPreparedTransactionStore(TransactionStore preparedTransactionStore) {@b@ this.preparedTransactionStore = preparedTransactionStore;@b@ }@b@@b@ public DeadLetterPolicy getDeadLetterPolicy()@b@ {@b@ return this.deadLetterPolicy;@b@ }@b@@b@ public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy)@b@ {@b@ this.deadLetterPolicy = deadLetterPolicy;@b@ }@b@@b@ public long getMaximumMemoryUsage()@b@ {@b@ return this.memoryManager.getValueLimit();@b@ }@b@@b@ public void setMaximumMemoryUsage(long maximumMemoryUsage)@b@ {@b@ this.memoryManager.setValueLimit(maximumMemoryUsage);@b@ }@b@@b@ public Context getDestinationContext(Hashtable environment)@b@ {@b@ Map data = new ConcurrentHashMap();@b@ for (Iterator iter = this.containerManagerMap.entrySet().iterator(); iter.hasNext(); ) {@b@ Map.Entry entry = (Map.Entry)iter.next();@b@ String name = entry.getKey().toString();@b@ MessageContainerManager manager = (MessageContainerManager)entry.getValue();@b@ Context context = new ReadOnlyContext(environment, manager.getDestinations());@b@ data.put(name, context);@b@ }@b@ return new ReadOnlyContext(environment, data);@b@ }@b@@b@ protected void doMessageSend(BrokerClient client, ActiveMQMessage message)@b@ throws JMSException@b@ {@b@ if (this.securityAdapter != null)@b@ this.securityAdapter.authorizeSendMessage(client, message);@b@@b@ ActiveMQDestination dest = message.getJMSActiveMQDestination();@b@ if (dest.isTopic()) {@b@ if ((message.isPersistent()) && (!(dest.isTemporary())))@b@ this.persistentTopicMCM.sendMessage(client, message);@b@@b@ this.transientTopicMCM.sendMessage(client, message);@b@ } else {@b@ this.transientQueueMCM.sendMessage(client, message);@b@ this.persistentQueueMCM.sendMessage(client, message);@b@ }@b@ }@b@@b@ protected PersistenceAdapter createPersistenceAdapter()@b@ throws JMSException@b@ {@b@ File directory = new File(getStoreDirectory());@b@@b@ PersistenceAdapter answer = null;@b@ String property = System.getProperty("activemq.persistenceAdapterFactory");@b@ if (property != null)@b@ answer = tryCreatePersistenceAdapter(property, directory, false);@b@@b@ if (answer == null)@b@ answer = tryCreatePersistenceAdapter("org.activemq.broker.impl.DefaultPersistenceAdapterFactory", directory, true);@b@@b@ if (answer != null) {@b@ return answer;@b@ }@b@@b@ log.warn("Default message store (journal+derby) could not be found in the classpath or property 'activemq.persistenceAdapterFactory' not specified so defaulting to use RAM based message persistence");@b@@b@ return new VMPersistenceAdapter();@b@ }@b@@b@ protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException@b@ {@b@ Class adapterClass = loadClass(className, ignoreErrors);@b@ if (adapterClass != null)@b@ try {@b@ PersistenceAdapterFactory factory = (PersistenceAdapterFactory)adapterClass.newInstance();@b@ PersistenceAdapter answer = factory.createPersistenceAdapter(directory, this.memoryManager);@b@ log.info("Persistence adapter created using: " + className);@b@ return answer;@b@ }@b@ catch (IOException cause) {@b@ throw createInstantiateAdapterException(className, cause);@b@ }@b@ catch (Throwable e) {@b@ if (!(ignoreErrors))@b@ throw createInstantiateAdapterException(className, e);@b@ }@b@@b@@b@ return null;@b@ }@b@@b@ protected JMSException createInstantiateAdapterException(String className, Throwable e) {@b@ return JMSExceptionHelper.newJMSException("Persistence adapter could not be created using: " + className + ". Reason: " + e, e);@b@ }@b@@b@ protected Class loadClass(String name, boolean ignoreErrors)@b@ throws JMSException@b@ {@b@ try@b@ {@b@ return Thread.currentThread().getContextClassLoader().loadClass(name);@b@ }@b@ catch (ClassNotFoundException e) {@b@ try {@b@ return getClass().getClassLoader().loadClass(name);@b@ }@b@ catch (ClassNotFoundException e2) {@b@ if (ignoreErrors) {@b@ log.trace("Could not find class: " + name + " on the classpath");@b@ return null;@b@ }@b@@b@ throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);@b@ }@b@ }@b@ }@b@@b@ protected String getStoreDirectory()@b@ {@b@ String defaultDirectory = "ActiveMQ" + File.separator + sanitizeString(getBrokerInfo().getBrokerName());@b@ return System.getProperty("activemq.store.dir", defaultDirectory);@b@ }@b@@b@ protected MessageContainerManager[] createContainerManagers()@b@ {@b@ int size = this.containerManagerMap.size();@b@ MessageContainerManager[] answer = new MessageContainerManager[size];@b@ this.containerManagerMap.values().toArray(answer);@b@ return answer;@b@ }@b@@b@ protected void makeDefaultContainerManagers() {@b@ this.transientTopicMCM = new TransientTopicBoundedMessageManager(this.queueManager);@b@ this.containerManagerMap.put("transientTopicContainer", this.transientTopicMCM);@b@ this.persistentTopicMCM = new DurableTopicMessageContainerManager(this.persistenceAdapter, this.redeliveryPolicy, this.deadLetterPolicy);@b@ this.containerManagerMap.put("persistentTopicContainer", this.persistentTopicMCM);@b@ this.persistentQueueMCM = new DurableQueueBoundedMessageManager(this.persistenceAdapter, this.queueManager, this.redeliveryPolicy, this.deadLetterPolicy);@b@ this.containerManagerMap.put("persistentQueueContainer", this.persistentQueueMCM);@b@ this.transientQueueMCM = new TransientQueueBoundedMessageManager(this.queueManager, this.redeliveryPolicy, this.deadLetterPolicy);@b@ this.containerManagerMap.put("transientQueueContainer", this.transientQueueMCM);@b@ }@b@@b@ protected void validateConsumer(ConsumerInfo info)@b@ throws JMSException@b@ {@b@ if (info.getConsumerId() == null)@b@ throw new JMSException("No consumerId specified for the ConsumerInfo");@b@ }@b@@b@ protected void checkValid() throws JMSException@b@ {@b@ if (this.containerManagers == null)@b@ throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");@b@ }@b@@b@ public void addConsumerInfoListener(ConsumerInfoListener l)@b@ {@b@ Iterator i;@b@ if (l != null) {@b@ this.consumerInfoListeners.add(l);@b@@b@ for (i = this.messageConsumers.entrySet().iterator(); i.hasNext(); ) {@b@ Map.Entry entry = (Map.Entry)i.next();@b@ ConsumerInfo info = (ConsumerInfo)entry.getKey();@b@ BrokerClient client = (BrokerClient)entry.getValue();@b@ l.onConsumerInfo(client, info);@b@ }@b@ }@b@ }@b@@b@ public void removeConsumerInfoListener(ConsumerInfoListener l)@b@ {@b@ this.consumerInfoListeners.remove(l);@b@ }@b@@b@ protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) {@b@ for (Iterator i = this.consumerInfoListeners.iterator(); i.hasNext(); ) {@b@ ConsumerInfoListener l = (ConsumerInfoListener)i.next();@b@ l.onConsumerInfo(client, info);@b@ }@b@ }@b@@b@ public MessageContainerManager getPersistentTopicContainerManager()@b@ {@b@ return this.persistentTopicMCM;@b@ }@b@@b@ public MessageContainerManager getTransientTopicContainerManager()@b@ {@b@ return this.transientTopicMCM;@b@ }@b@@b@ public MessageContainerManager getPersistentQueueContainerManager()@b@ {@b@ return this.persistentQueueMCM;@b@ }@b@@b@ public MessageContainerManager getTransientQueueContainerManager()@b@ {@b@ return this.transientQueueMCM;@b@ }@b@@b@ public BrokerAdmin getBrokerAdmin()@b@ {@b@ return this;@b@ }@b@@b@ public void createMessageContainer(ActiveMQDestination dest) throws JMSException {@b@ for (int i = 0; i < this.containerManagers.length; ++i)@b@ this.containerManagers[i].createMessageContainer(dest);@b@ }@b@@b@ public void destoryMessageContainer(ActiveMQDestination dest) throws JMSException@b@ {@b@ for (int i = 0; i < this.containerManagers.length; ++i)@b@ this.containerManagers[i].destroyMessageContainer(dest);@b@ }@b@@b@ public MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination dest) throws JMSException@b@ {@b@ for (int i = 0; i < this.containerManagers.length; ++i) {@b@ Map messageContainerAdmins = this.containerManagers[i].getMessageContainerAdmins();@b@ MessageContainerAdmin mca = (MessageContainerAdmin)messageContainerAdmins.get(dest);@b@ if (mca != null)@b@ return mca;@b@ }@b@@b@ return null;@b@ }@b@@b@ public MessageContainerAdmin[] listMessageContainerAdmin()@b@ throws JMSException@b@ {@b@ ArrayList l = new ArrayList();@b@ for (int i = 0; i < this.containerManagers.length; ++i) {@b@ Map messageContainerAdmins = this.containerManagers[i].getMessageContainerAdmins();@b@ for (Iterator iter = messageContainerAdmins.values().iterator(); iter.hasNext(); ) {@b@ MessageContainerAdmin mca = (MessageContainerAdmin)iter.next();@b@ l.add(mca);@b@ }@b@ }@b@@b@ MessageContainerAdmin[] answer = new MessageContainerAdmin[l.size()];@b@ l.toArray(answer);@b@ return answer;@b@ }@b@@b@ public void sendToDeadLetterQueue(String deadLetterName, ActiveMQMessage expiredMessage)@b@ throws JMSException@b@ {@b@ if (this.persistentQueueMCM != null) {@b@ Transaction original = TransactionManager.getContexTransaction();@b@ try {@b@ TransactionManager.setContexTransaction(null);@b@ this.persistentQueueMCM.sendToDeadLetterQueue(deadLetterName, expiredMessage);@b@ log.debug(expiredMessage + " sent to DLQ: " + deadLetterName);@b@ } finally {@b@ TransactionManager.setContexTransaction(original);@b@ }@b@ }@b@ }@b@@b@ private final void associateTransaction(ActiveMQMessage message)@b@ throws JMSException@b@ {@b@ Transaction transaction;@b@ if (message.isPartOfTransaction()) {@b@ if (message.isXaTransacted())@b@ try {@b@ transaction = this.transactionManager.getXATransaction((ActiveMQXid)message.getTransactionId());@b@ }@b@ catch (XAException e) {@b@ throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@ }@b@@b@ transaction = this.transactionManager.getLocalTransaction((String)message.getTransactionId());@b@ }@b@ else@b@ {@b@ transaction = null;@b@ }@b@ TransactionManager.setContexTransaction(transaction);@b@ }@b@@b@ private void disAssociateTransaction() {@b@ TransactionManager.setContexTransaction(null);@b@ }@b@@b@ private void associateTransaction(MessageAck ack)@b@ throws JMSException@b@ {@b@ Transaction transaction;@b@ if (ack.isPartOfTransaction()) {@b@ if (ack.isXaTransacted())@b@ try {@b@ transaction = this.transactionManager.getXATransaction((ActiveMQXid)ack.getTransactionId());@b@ }@b@ catch (XAException e) {@b@ throw ((JMSException)new JMSException(e.getMessage()).initCause(e));@b@ }@b@@b@ transaction = this.transactionManager.getLocalTransaction((String)ack.getTransactionId());@b@ }@b@ else@b@ {@b@ transaction = null;@b@ }@b@ TransactionManager.setContexTransaction(transaction);@b@ }@b@@b@ private String sanitizeString(String in) {@b@ String result = null;@b@ if (in != null) {@b@ result = in.replace(':', '_');@b@ result = result.replace('/', '_');@b@ result = result.replace('\\', '_');@b@ }@b@ return result;@b@ }@b@@b@ public MemoryBoundedObjectManager getMemoryManager()@b@ {@b@ return this.memoryManager;@b@ }@b@@b@ public MemoryBoundedQueueManager getQueueManager()@b@ {@b@ return this.queueManager;@b@ }@b@@b@ public String getName()@b@ {@b@ return getBrokerName();@b@ }@b@@b@ public String toString()@b@ {@b@ return "broker: " + getName();@b@ }@b@@b@ public BrokerInfo getBrokerInfo()@b@ {@b@ return this.brokerInfo;@b@ }@b@@b@ protected void setBrokercontainer(BrokerContainerImpl container) {@b@ this.brokerContainer = container;@b@ }@b@@b@ protected BrokerContainerImpl getBrokerContainer() {@b@ return this.brokerContainer;@b@ }@b@@b@ static TransactionManager access$000(DefaultBroker x0)@b@ {@b@ return x0.transactionManager; } @b@ static PersistenceAdapter access$100(DefaultBroker x0) { return x0.persistenceAdapter;@b@ }@b@}
3.容器监控
package org.activemq.capacity;@b@@b@import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;@b@import java.util.Iterator;@b@@b@public class DelegateCapacityMonitor@b@ implements CapacityMonitor@b@{@b@ String name;@b@ CapacityMonitor monitor;@b@ CopyOnWriteArrayList listeners = new CopyOnWriteArrayList();@b@@b@ public DelegateCapacityMonitor()@b@ {@b@ }@b@@b@ public DelegateCapacityMonitor(String name, CapacityMonitor cm)@b@ {@b@ this.name = name;@b@ this.monitor = cm;@b@ }@b@@b@ public void setDelegate(CapacityMonitor cm)@b@ {@b@ Iterator i;@b@ this.monitor = cm;@b@ if (cm != null)@b@ for (i = this.listeners.iterator(); i.hasNext(); ) {@b@ CapacityMonitorEventListener listener = (CapacityMonitorEventListener)i.next();@b@ cm.addCapacityEventListener(listener);@b@ }@b@ }@b@@b@ public String getName()@b@ {@b@ return this.name;@b@ }@b@@b@ public void setName(String newName)@b@ {@b@ this.name = newName;@b@ }@b@@b@ public int getRoundingFactor()@b@ {@b@ return ((this.monitor == null) ? 0 : this.monitor.getRoundingFactor());@b@ }@b@@b@ public void setRoundingFactor(int newRoundingFactor)@b@ {@b@ if (this.monitor != null)@b@ this.monitor.setRoundingFactor(newRoundingFactor);@b@ }@b@@b@ public void addCapacityEventListener(CapacityMonitorEventListener l)@b@ {@b@ this.listeners.add(l);@b@ if (this.monitor != null)@b@ this.monitor.addCapacityEventListener(l);@b@ }@b@@b@ public void removeCapacityEventListener(CapacityMonitorEventListener l)@b@ {@b@ this.listeners.remove(l);@b@ if (this.monitor != null)@b@ this.monitor.removeCapacityEventListener(l);@b@ }@b@@b@ public int getCurrentCapacity()@b@ {@b@ return ((this.monitor == null) ? 100 : this.monitor.getCurrentCapacity());@b@ }@b@@b@ public int getRoundedCapacity()@b@ {@b@ return ((this.monitor == null) ? 100 : this.monitor.getRoundedCapacity());@b@ }@b@@b@ public long getCurrentValue()@b@ {@b@ return ((this.monitor == null) ? 100L : this.monitor.getCurrentValue());@b@ }@b@@b@ public void setCurrentValue(long newCurrentValue)@b@ {@b@ if (this.monitor != null)@b@ this.monitor.setCurrentValue(newCurrentValue);@b@ }@b@@b@ public long getValueLimit()@b@ {@b@ return ((this.monitor == null) ? 100L : this.monitor.getValueLimit());@b@ }@b@@b@ public void setValueLimit(long newValueLimit)@b@ {@b@ if (this.monitor != null)@b@ this.monitor.setValueLimit(newValueLimit);@b@ }@b@@b@ public CapacityMonitorEvent generateCapacityMonitorEvent()@b@ {@b@ return ((this.monitor != null) ? this.monitor.generateCapacityMonitorEvent() : null);@b@ }@b@}
package org.activemq.capacity;@b@@b@public abstract interface CapacityMonitor@b@{@b@ public abstract String getName();@b@@b@ public abstract void setName(String paramString);@b@@b@ public abstract int getRoundingFactor();@b@@b@ public abstract void setRoundingFactor(int paramInt);@b@@b@ public abstract void addCapacityEventListener(CapacityMonitorEventListener paramCapacityMonitorEventListener);@b@@b@ public abstract void removeCapacityEventListener(CapacityMonitorEventListener paramCapacityMonitorEventListener);@b@@b@ public abstract int getCurrentCapacity();@b@@b@ public abstract int getRoundedCapacity();@b@@b@ public abstract long getCurrentValue();@b@@b@ public abstract void setCurrentValue(long paramLong);@b@@b@ public abstract long getValueLimit();@b@@b@ public abstract void setValueLimit(long paramLong);@b@@b@ public abstract CapacityMonitorEvent generateCapacityMonitorEvent();@b@@b@ public static class BasicCapacityMonitor@b@ {@b@ }@b@}