一、前言
实现事务transaction的方法有很多种,这边基于datanucleus开源包datanucleus-core中org.datanucleus.transaction.Transaction事务、org.datanucleus.transaction.TransactionManager事务管理的实现具体分析说明,当然实现事务过程遵循javax.transaction定义标准(javax.transaction.Synchronization、javax.transaction.xa.XAResource、javax.transaction.xa.Xid),具体参考如下。
二、源码说明
1.Transaction类
package org.datanucleus.transaction;@b@@b@import java.util.ArrayList;@b@import java.util.HashMap;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Random;@b@import java.util.Set;@b@import javax.transaction.Synchronization;@b@import javax.transaction.xa.XAException;@b@import javax.transaction.xa.XAResource;@b@import javax.transaction.xa.Xid;@b@import org.datanucleus.ObjectManagerFactoryImpl;@b@import org.datanucleus.util.Localiser;@b@import org.datanucleus.util.NucleusLogger;@b@import org.omg.CORBA.SystemException;@b@@b@public class Transaction@b@{@b@ protected static final Localiser LOCALISER = Localiser.getInstance("org.datanucleus.Localisation", ObjectManagerFactoryImpl.class.getClassLoader());@b@ private static final int nodeId = new Random().nextInt();@b@ private static int nextGlobalTransactionId = 1;@b@ private int nextBranchId = 1;@b@ private final Xid xid;@b@ private int status;@b@ private boolean completing = false;@b@ private List<Synchronization> synchronization = new ArrayList();@b@ private List<XAResource> enlistedResources = new ArrayList();@b@ private Map<Object, XAResource> branches = new HashMap();@b@ private Map<XAResource, Xid> activeBranches = new HashMap();@b@ private Map<XAResource, Xid> suspendedResources = new HashMap();@b@@b@ Transaction()@b@ {@b@ this.xid = new XidImpl(nodeId, 0, nextGlobalTransactionId++);@b@ if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@ {@b@ NucleusLogger.TRANSACTION.debug("Transaction created " + toString());@b@ }@b@ }@b@@b@ public void commit()@b@ throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException@b@ {@b@ if (this.completing)@b@ {@b@ return;@b@ }@b@ if (this.status == 1)@b@ {@b@ rollback();@b@ return;@b@ }@b@@b@ try@b@ {@b@ Object key;@b@ XAResource resourceManager;@b@ this.completing = true;@b@ if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@ {@b@ NucleusLogger.TRANSACTION.debug("Committing " + toString());@b@ }@b@@b@ if (this.status != 0)@b@ {@b@ throw new IllegalStateException();@b@ }@b@@b@ Iterator syncIterator = this.synchronization.iterator();@b@ while (syncIterator.hasNext())@b@ {@b@ ((Synchronization)syncIterator.next()).beforeCompletion();@b@ }@b@@b@ List failures = null;@b@ boolean failed = false;@b@@b@ Iterator branchKeys = this.branches.keySet().iterator();@b@@b@ if (this.enlistedResources.size() == 1)@b@ {@b@ this.status = 8;@b@ while (branchKeys.hasNext())@b@ {@b@ key = branchKeys.next();@b@ resourceManager = (XAResource)this.branches.get(key);@b@ try@b@ {@b@ if (!(failed))@b@ {@b@ resourceManager.commit(this.xid, true);@b@ }@b@ else@b@ {@b@ resourceManager.rollback(this.xid);@b@ }@b@ }@b@ catch (Throwable e)@b@ {@b@ if (failures == null)@b@ {@b@ failures = new ArrayList();@b@ }@b@ failures.add(e);@b@ failed = true;@b@ this.status = 1;@b@ NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "commit", resourceManager, getXAErrorCode(e), toString()));@b@ }@b@ }@b@ if (!(failed))@b@ {@b@ this.status = 3;@b@ }@b@ else@b@ {@b@ this.status = 4;@b@ }@b@ }@b@ else if (this.enlistedResources.size() > 0)@b@ {@b@ this.status = 7;@b@ while ((!(failed)) && (branchKeys.hasNext()))@b@ {@b@ key = branchKeys.next();@b@ resourceManager = (XAResource)this.branches.get(key);@b@ try@b@ {@b@ resourceManager.prepare((Xid)key);@b@ }@b@ catch (Throwable e)@b@ {@b@ if (failures == null)@b@ {@b@ failures = new ArrayList();@b@ }@b@ failures.add(e);@b@ failed = true;@b@ this.status = 1;@b@ NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "prepare", resourceManager, getXAErrorCode(e), toString()));@b@ }@b@ }@b@@b@ if (!(failed))@b@ {@b@ this.status = 2;@b@ }@b@@b@ if (failed)@b@ {@b@ this.status = 9;@b@ failed = false;@b@@b@ branchKeys = this.branches.keySet().iterator();@b@ while (branchKeys.hasNext())@b@ {@b@ key = branchKeys.next();@b@ resourceManager = (XAResource)this.branches.get(key);@b@ try@b@ {@b@ resourceManager.rollback((Xid)key);@b@ }@b@ catch (Throwable e)@b@ {@b@ NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "rollback", resourceManager, getXAErrorCode(e), toString()));@b@ if (failures == null)@b@ {@b@ failures = new ArrayList();@b@ }@b@ failures.add(e);@b@ failed = true;@b@ }@b@ }@b@ this.status = 4;@b@ }@b@ else@b@ {@b@ this.status = 8;@b@@b@ branchKeys = this.branches.keySet().iterator();@b@ while (branchKeys.hasNext())@b@ {@b@ key = branchKeys.next();@b@ resourceManager = (XAResource)this.branches.get(key);@b@ try@b@ {@b@ resourceManager.commit((Xid)key, false);@b@ }@b@ catch (Throwable e)@b@ {@b@ NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "commit", resourceManager, getXAErrorCode(e), toString()));@b@ if (failures == null)@b@ {@b@ failures = new ArrayList();@b@ }@b@ failures.add(e);@b@ failed = true;@b@ }@b@ }@b@ this.status = 3;@b@ }@b@@b@ }@b@@b@ syncIterator = this.synchronization.iterator();@b@ while (syncIterator.hasNext())@b@ {@b@ ((Synchronization)syncIterator.next()).afterCompletion(this.status);@b@ }@b@@b@ if (this.status == 4)@b@ {@b@ if (failed)@b@ {@b@ if (failures.size() == 1)@b@ {@b@ throw new HeuristicRollbackException("Transaction rolled back due to failure during commit", (Exception)failures.get(0));@b@ }@b@@b@ throw new HeuristicRollbackException("Multiple failures");@b@ }@b@@b@ throw new RollbackException();@b@ }@b@@b@ if ((this.status == 3) && (failed))@b@ {@b@ throw new HeuristicMixedException();@b@ }@b@@b@ }@b@ finally@b@ {@b@ this.completing = false;@b@ }@b@ }@b@@b@ public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException@b@ {@b@ if (xaRes == null)@b@ {@b@ return false;@b@ }@b@@b@ if (this.status != 0)@b@ {@b@ throw new IllegalStateException();@b@ }@b@@b@ Xid xid = (Xid)this.activeBranches.get(xaRes);@b@@b@ if (xid == null)@b@ {@b@ throw new IllegalStateException();@b@ }@b@@b@ this.activeBranches.remove(xaRes);@b@@b@ if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@ {@b@ NucleusLogger.TRANSACTION.debug(LOCALISER.msg("015039", "delist", xaRes, getXAFlag(flag), toString()));@b@ }@b@@b@ XAException exception = null;@b@ try@b@ {@b@ xaRes.end(xid, flag);@b@ }@b@ catch (XAException e)@b@ {@b@ exception = e;@b@ }@b@@b@ if (exception != null)@b@ {@b@ NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "delist", xaRes, getXAErrorCode(exception), toString()));@b@ return false;@b@ }@b@@b@ if (flag == 33554432)@b@ {@b@ this.suspendedResources.put(xaRes, xid);@b@ }@b@ return true;@b@ }@b@@b@ public boolean enlistResource(XAResource xaRes)@b@ throws RollbackException, IllegalStateException, SystemException@b@ {@b@ if (xaRes == null)@b@ {@b@ return false;@b@ }@b@@b@ if (this.status == 1)@b@ {@b@ throw new RollbackException();@b@ }@b@@b@ if (this.status != 0)@b@ {@b@ throw new IllegalStateException();@b@ }@b@@b@ Xid activeXid = (Xid)this.activeBranches.get(xaRes);@b@ if (activeXid != null)@b@ {@b@ return false;@b@ }@b@@b@ boolean alreadyEnlisted = false;@b@ int flag = 0;@b@@b@ Xid branchXid = (Xid)this.suspendedResources.get(xaRes);@b@@b@ if (branchXid == null)@b@ {@b@ Iterator enlistedIterator = this.enlistedResources.iterator();@b@ while ((!(alreadyEnlisted)) && (enlistedIterator.hasNext()))@b@ {@b@ XAResource resourceManager = (XAResource)enlistedIterator.next();@b@ try@b@ {@b@ if (resourceManager.isSameRM(xaRes))@b@ {@b@ flag = 2097152;@b@ alreadyEnlisted = true;@b@ }@b@ }@b@ catch (XAException e)@b@ {@b@ }@b@ }@b@@b@ branchXid = new XidImpl(this.nextBranchId++, this.xid.getFormatId(), this.xid.getGlobalTransactionId());@b@ }@b@ else@b@ {@b@ alreadyEnlisted = true;@b@ flag = 134217728;@b@ this.suspendedResources.remove(xaRes);@b@ }@b@@b@ if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@ {@b@ NucleusLogger.TRANSACTION.debug(LOCALISER.msg("015039", "enlist", xaRes, getXAFlag(flag), toString()));@b@ }@b@@b@ try@b@ {@b@ xaRes.start(branchXid, flag);@b@ }@b@ catch (XAException e)@b@ {@b@ NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "enlist", xaRes, getXAErrorCode(e), toString()));@b@ return false;@b@ }@b@@b@ if (!(alreadyEnlisted))@b@ {@b@ this.enlistedResources.add(xaRes);@b@ }@b@@b@ this.branches.put(branchXid, xaRes);@b@ this.activeBranches.put(xaRes, branchXid);@b@@b@ return true;@b@ }@b@@b@ public int getStatus() throws SystemException@b@ {@b@ return this.status;@b@ }@b@@b@ public void registerSynchronization(Synchronization sync) throws RollbackException, IllegalStateException, SystemException@b@ {@b@ if (sync == null)@b@ {@b@ return;@b@ }@b@ if (this.status == 1)@b@ {@b@ throw new RollbackException();@b@ }@b@ if (this.status != 0)@b@ {@b@ throw new IllegalStateException();@b@ }@b@ this.synchronization.add(sync);@b@ }@b@@b@ public void rollback() throws IllegalStateException, SystemException@b@ {@b@ if (this.completing)@b@ {@b@ return;@b@ }@b@ try@b@ {@b@ this.completing = true;@b@ if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@ {@b@ NucleusLogger.TRANSACTION.debug("Rolling back " + toString());@b@ }@b@@b@ if ((this.status != 0) && (this.status != 1))@b@ {@b@ throw new IllegalStateException();@b@ }@b@@b@ List failures = null;@b@@b@ Iterator branchKeys = this.branches.keySet().iterator();@b@@b@ this.status = 9;@b@ while (branchKeys.hasNext())@b@ {@b@ Xid xid = (Xid)branchKeys.next();@b@ XAResource resourceManager = (XAResource)this.branches.get(xid);@b@ try@b@ {@b@ resourceManager.rollback(xid);@b@ }@b@ catch (Throwable e)@b@ {@b@ if (failures == null)@b@ {@b@ failures = new ArrayList();@b@ }@b@ failures.add(e);@b@ NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "rollback", resourceManager, getXAErrorCode(e), toString()));@b@ }@b@ }@b@ this.status = 4;@b@@b@ Iterator syncIterator = this.synchronization.iterator();@b@ while (syncIterator.hasNext())@b@ {@b@ ((Synchronization)syncIterator.next()).afterCompletion(this.status);@b@ }@b@ }@b@ finally@b@ {@b@ this.completing = false;@b@ }@b@ }@b@@b@ public void setRollbackOnly() throws IllegalStateException, SystemException@b@ {@b@ this.status = 1;@b@ }@b@@b@ public static String getXAErrorCode(Throwable xae)@b@ {@b@ if (!(xae instanceof XAException))@b@ {@b@ return "UNKNOWN";@b@ }@b@ switch (((XAException)xae).errorCode)@b@ {@b@ case 7:@b@ return "XA_HEURCOM";@b@ case 8:@b@ return "XA_HEURHAZ";@b@ case 5:@b@ return "XA_HEURMIX";@b@ case 6:@b@ return "XA_HEURRB";@b@ case 9:@b@ return "XA_NOMIGRATE";@b@ case 100:@b@ return "XA_RBBASE";@b@ case 101:@b@ return "XA_RBCOMMFAIL";@b@ case 102:@b@ return "XA_RBBEADLOCK";@b@ case 107:@b@ return "XA_RBEND";@b@ case 103:@b@ return "XA_RBINTEGRITY";@b@ case 104:@b@ return "XA_RBOTHER";@b@ case 105:@b@ return "XA_RBPROTO";@b@ case 106:@b@ return "XA_RBTIMEOUT";@b@ case 3:@b@ return "XA_RDONLY";@b@ case 4:@b@ return "XA_RETRY";@b@ case -2:@b@ return "XAER_ASYNC";@b@ case -8:@b@ return "XAER_DUPID";@b@ case -5:@b@ return "XAER_INVAL";@b@ case -4:@b@ return "XAER_NOTA";@b@ case -9:@b@ return "XAER_OUTSIDE";@b@ case -6:@b@ return "XAER_PROTO";@b@ case -3:@b@ return "XAER_RMERR";@b@ case -7:@b@ return "XAER_RMFAIL";@b@ }@b@ return "UNKNOWN";@b@ }@b@@b@ private static String getXAFlag(int flag)@b@ {@b@ switch (flag)@b@ {@b@ case 8388608:@b@ return "TMENDRSCAN";@b@ case 536870912:@b@ return "TMFAIL";@b@ case 2097152:@b@ return "TMJOIN";@b@ case 0:@b@ return "TMNOFLAGS";@b@ case 1073741824:@b@ return "TMONEPHASE";@b@ case 134217728:@b@ return "TMRESUME";@b@ case 16777216:@b@ return "TMSTARTRSCAN";@b@ case 67108864:@b@ return "TMSUCCESS";@b@ case 33554432:@b@ return "TMSUSPEND";@b@ }@b@ return "UNKNOWN";@b@ }@b@@b@ public String toString()@b@ {@b@ return "[DataNucleus Transaction, ID=" + this.xid.toString() + ", enlisted resources=" + this.enlistedResources.toString() + "]";@b@ }@b@}
2.TransactionManager事务管理类
package org.datanucleus.transaction;@b@@b@import java.util.Hashtable;@b@import org.datanucleus.management.ManagementServer;@b@import org.datanucleus.management.runtime.TransactionRuntime;@b@import org.datanucleus.util.ClassUtils;@b@@b@public class TransactionManager@b@{@b@ private boolean containerManagedConnections;@b@ Hashtable<Object, Transaction> transactions;@b@ private TransactionRuntime txRuntime;@b@@b@ public TransactionManager()@b@ {@b@ this.containerManagedConnections = false;@b@@b@ this.transactions = new Hashtable();@b@@b@ this.txRuntime = null;@b@ }@b@@b@ public void setContainerManagedConnections(boolean flag) {@b@ this.containerManagedConnections = flag;@b@ }@b@@b@ public void registerMbean(String domainName, String instanceName, ManagementServer mgmtServer)@b@ {@b@ if (mgmtServer != null)@b@ {@b@ this.txRuntime = new TransactionRuntime();@b@ String mbeanName = domainName + ":InstanceName=" + instanceName + ",Type=" + ClassUtils.getClassNameForClass(this.txRuntime.getClass()) + ",Name=TransactionRuntime";@b@@b@ mgmtServer.registerMBean(this.txRuntime, mbeanName);@b@ }@b@ }@b@@b@ public TransactionRuntime getTransactionRuntime()@b@ {@b@ return this.txRuntime;@b@ }@b@@b@ public void begin(Object om)@b@ {@b@ Transaction tx = (Transaction)this.transactions.get(om);@b@ if (tx != null)@b@ {@b@ throw new NucleusTransactionException("Invalid state. Transaction has already started");@b@ }@b@ tx = new Transaction();@b@ this.transactions.put(om, tx);@b@ }@b@@b@ public void commit(Object om)@b@ {@b@ Transaction tx = (Transaction)this.transactions.get(om);@b@ if (tx == null)@b@ {@b@ throw new NucleusTransactionException("Invalid state. Transaction does not exist");@b@ }@b@ try@b@ {@b@ if (!(this.containerManagedConnections))@b@ {@b@ tx.commit();@b@ }@b@ }@b@ finally@b@ {@b@ this.transactions.remove(om);@b@ }@b@ }@b@@b@ public Transaction getTransaction(Object om)@b@ {@b@ if (om == null)@b@ {@b@ return null;@b@ }@b@ return ((Transaction)this.transactions.get(om));@b@ }@b@@b@ public void resume(Object om, Transaction tx)@b@ {@b@ throw new UnsupportedOperationException();@b@ }@b@@b@ public void rollback(Object om)@b@ {@b@ Transaction tx = (Transaction)this.transactions.get(om);@b@ if (tx == null)@b@ {@b@ throw new NucleusTransactionException("Invalid state. Transaction does not exist");@b@ }@b@ try@b@ {@b@ if (!(this.containerManagedConnections))@b@ {@b@ tx.rollback();@b@ }@b@ }@b@ finally@b@ {@b@ this.transactions.remove(om);@b@ }@b@ }@b@@b@ public void setRollbackOnly(Object om)@b@ {@b@ Transaction tx = (Transaction)this.transactions.get(om);@b@ if (tx == null)@b@ {@b@ throw new NucleusTransactionException("Invalid state. Transaction does not exist");@b@ }@b@ tx.setRollbackOnly();@b@ }@b@@b@ public void setTransactionTimeout(Object om, int millis)@b@ {@b@ throw new UnsupportedOperationException();@b@ }@b@@b@ public Transaction suspend(Object om)@b@ {@b@ throw new UnsupportedOperationException();@b@ }@b@}
3.XidImpl实现类(实现javax.transaction.xa.Xid接口定义)
package org.datanucleus.transaction;@b@@b@import javax.transaction.xa.Xid;@b@@b@public class XidImpl@b@ implements Xid@b@{@b@ byte[] branchQualifierBytes;@b@ int formatId;@b@ byte[] globalTransactionIdBytes;@b@@b@ public XidImpl(int branchQualifierBytes, int formatId, byte[] globalTransactionIdBytes)@b@ {@b@ byte[] buf = new byte[4];@b@ buf[0] = (byte)(branchQualifierBytes >>> 24 & 0xFF);@b@ buf[1] = (byte)(branchQualifierBytes >>> 16 & 0xFF);@b@ buf[2] = (byte)(branchQualifierBytes >>> 8 & 0xFF);@b@ buf[3] = (byte)(branchQualifierBytes & 0xFF);@b@ this.branchQualifierBytes = buf;@b@ this.formatId = formatId;@b@ this.globalTransactionIdBytes = globalTransactionIdBytes;@b@ }@b@@b@ public XidImpl(int branchQualifierBytes, int formatId, int globalTransactionIdBytes)@b@ {@b@ byte[] buf = new byte[4];@b@ buf[0] = (byte)(branchQualifierBytes >>> 24 & 0xFF);@b@ buf[1] = (byte)(branchQualifierBytes >>> 16 & 0xFF);@b@ buf[2] = (byte)(branchQualifierBytes >>> 8 & 0xFF);@b@ buf[3] = (byte)(branchQualifierBytes & 0xFF);@b@ this.branchQualifierBytes = buf;@b@ this.formatId = formatId;@b@ buf = new byte[4];@b@ buf[0] = (byte)(globalTransactionIdBytes >>> 24 & 0xFF);@b@ buf[1] = (byte)(globalTransactionIdBytes >>> 16 & 0xFF);@b@ buf[2] = (byte)(globalTransactionIdBytes >>> 8 & 0xFF);@b@ buf[3] = (byte)(globalTransactionIdBytes & 0xFF);@b@ this.globalTransactionIdBytes = buf;@b@ }@b@@b@ public byte[] getBranchQualifier()@b@ {@b@ return this.branchQualifierBytes;@b@ }@b@@b@ public int getFormatId()@b@ {@b@ return this.formatId;@b@ }@b@@b@ public byte[] getGlobalTransactionId()@b@ {@b@ return this.globalTransactionIdBytes;@b@ }@b@@b@ public String toString()@b@ {@b@ return "Xid=" + new String(this.globalTransactionIdBytes);@b@ }@b@}