首页

关于datanucleus的对于Transaction事务及事务管理TransactionManager实现源码解读分析

标签:datanucleus-core,事务管理,transaction,Synchronization,Xid,事务设计原理     发布时间:2018-01-16   

一、前言

实现事务transaction的方法有很多种,这边基于datanucleus开源包datanucleus-core中org.datanucleus.transaction.Transaction事务、org.datanucleus.transaction.TransactionManager事务管理的实现具体分析说明,当然实现事务过程遵循javax.transaction定义标准(javax.transaction.Synchronization、javax.transaction.xa.XAResource、javax.transaction.xa.Xid),具体参考如下。

二、源码说明

1.Transaction类

package org.datanucleus.transaction;@b@@b@import java.util.ArrayList;@b@import java.util.HashMap;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Random;@b@import java.util.Set;@b@import javax.transaction.Synchronization;@b@import javax.transaction.xa.XAException;@b@import javax.transaction.xa.XAResource;@b@import javax.transaction.xa.Xid;@b@import org.datanucleus.ObjectManagerFactoryImpl;@b@import org.datanucleus.util.Localiser;@b@import org.datanucleus.util.NucleusLogger;@b@import org.omg.CORBA.SystemException;@b@@b@public class Transaction@b@{@b@  protected static final Localiser LOCALISER = Localiser.getInstance("org.datanucleus.Localisation", ObjectManagerFactoryImpl.class.getClassLoader());@b@  private static final int nodeId = new Random().nextInt();@b@  private static int nextGlobalTransactionId = 1;@b@  private int nextBranchId = 1;@b@  private final Xid xid;@b@  private int status;@b@  private boolean completing = false;@b@  private List<Synchronization> synchronization = new ArrayList();@b@  private List<XAResource> enlistedResources = new ArrayList();@b@  private Map<Object, XAResource> branches = new HashMap();@b@  private Map<XAResource, Xid> activeBranches = new HashMap();@b@  private Map<XAResource, Xid> suspendedResources = new HashMap();@b@@b@  Transaction()@b@  {@b@    this.xid = new XidImpl(nodeId, 0, nextGlobalTransactionId++);@b@    if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@    {@b@      NucleusLogger.TRANSACTION.debug("Transaction created " + toString());@b@    }@b@  }@b@@b@  public void commit()@b@    throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException@b@  {@b@    if (this.completing)@b@    {@b@      return;@b@    }@b@    if (this.status == 1)@b@    {@b@      rollback();@b@      return;@b@    }@b@@b@    try@b@    {@b@      Object key;@b@      XAResource resourceManager;@b@      this.completing = true;@b@      if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@      {@b@        NucleusLogger.TRANSACTION.debug("Committing " + toString());@b@      }@b@@b@      if (this.status != 0)@b@      {@b@        throw new IllegalStateException();@b@      }@b@@b@      Iterator syncIterator = this.synchronization.iterator();@b@      while (syncIterator.hasNext())@b@      {@b@        ((Synchronization)syncIterator.next()).beforeCompletion();@b@      }@b@@b@      List failures = null;@b@      boolean failed = false;@b@@b@      Iterator branchKeys = this.branches.keySet().iterator();@b@@b@      if (this.enlistedResources.size() == 1)@b@      {@b@        this.status = 8;@b@        while (branchKeys.hasNext())@b@        {@b@          key = branchKeys.next();@b@          resourceManager = (XAResource)this.branches.get(key);@b@          try@b@          {@b@            if (!(failed))@b@            {@b@              resourceManager.commit(this.xid, true);@b@            }@b@            else@b@            {@b@              resourceManager.rollback(this.xid);@b@            }@b@          }@b@          catch (Throwable e)@b@          {@b@            if (failures == null)@b@            {@b@              failures = new ArrayList();@b@            }@b@            failures.add(e);@b@            failed = true;@b@            this.status = 1;@b@            NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "commit", resourceManager, getXAErrorCode(e), toString()));@b@          }@b@        }@b@        if (!(failed))@b@        {@b@          this.status = 3;@b@        }@b@        else@b@        {@b@          this.status = 4;@b@        }@b@      }@b@      else if (this.enlistedResources.size() > 0)@b@      {@b@        this.status = 7;@b@        while ((!(failed)) && (branchKeys.hasNext()))@b@        {@b@          key = branchKeys.next();@b@          resourceManager = (XAResource)this.branches.get(key);@b@          try@b@          {@b@            resourceManager.prepare((Xid)key);@b@          }@b@          catch (Throwable e)@b@          {@b@            if (failures == null)@b@            {@b@              failures = new ArrayList();@b@            }@b@            failures.add(e);@b@            failed = true;@b@            this.status = 1;@b@            NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "prepare", resourceManager, getXAErrorCode(e), toString()));@b@          }@b@        }@b@@b@        if (!(failed))@b@        {@b@          this.status = 2;@b@        }@b@@b@        if (failed)@b@        {@b@          this.status = 9;@b@          failed = false;@b@@b@          branchKeys = this.branches.keySet().iterator();@b@          while (branchKeys.hasNext())@b@          {@b@            key = branchKeys.next();@b@            resourceManager = (XAResource)this.branches.get(key);@b@            try@b@            {@b@              resourceManager.rollback((Xid)key);@b@            }@b@            catch (Throwable e)@b@            {@b@              NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "rollback", resourceManager, getXAErrorCode(e), toString()));@b@              if (failures == null)@b@              {@b@                failures = new ArrayList();@b@              }@b@              failures.add(e);@b@              failed = true;@b@            }@b@          }@b@          this.status = 4;@b@        }@b@        else@b@        {@b@          this.status = 8;@b@@b@          branchKeys = this.branches.keySet().iterator();@b@          while (branchKeys.hasNext())@b@          {@b@            key = branchKeys.next();@b@            resourceManager = (XAResource)this.branches.get(key);@b@            try@b@            {@b@              resourceManager.commit((Xid)key, false);@b@            }@b@            catch (Throwable e)@b@            {@b@              NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "commit", resourceManager, getXAErrorCode(e), toString()));@b@              if (failures == null)@b@              {@b@                failures = new ArrayList();@b@              }@b@              failures.add(e);@b@              failed = true;@b@            }@b@          }@b@          this.status = 3;@b@        }@b@@b@      }@b@@b@      syncIterator = this.synchronization.iterator();@b@      while (syncIterator.hasNext())@b@      {@b@        ((Synchronization)syncIterator.next()).afterCompletion(this.status);@b@      }@b@@b@      if (this.status == 4)@b@      {@b@        if (failed)@b@        {@b@          if (failures.size() == 1)@b@          {@b@            throw new HeuristicRollbackException("Transaction rolled back due to failure during commit", (Exception)failures.get(0));@b@          }@b@@b@          throw new HeuristicRollbackException("Multiple failures");@b@        }@b@@b@        throw new RollbackException();@b@      }@b@@b@      if ((this.status == 3) && (failed))@b@      {@b@        throw new HeuristicMixedException();@b@      }@b@@b@    }@b@    finally@b@    {@b@      this.completing = false;@b@    }@b@  }@b@@b@  public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException@b@  {@b@    if (xaRes == null)@b@    {@b@      return false;@b@    }@b@@b@    if (this.status != 0)@b@    {@b@      throw new IllegalStateException();@b@    }@b@@b@    Xid xid = (Xid)this.activeBranches.get(xaRes);@b@@b@    if (xid == null)@b@    {@b@      throw new IllegalStateException();@b@    }@b@@b@    this.activeBranches.remove(xaRes);@b@@b@    if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@    {@b@      NucleusLogger.TRANSACTION.debug(LOCALISER.msg("015039", "delist", xaRes, getXAFlag(flag), toString()));@b@    }@b@@b@    XAException exception = null;@b@    try@b@    {@b@      xaRes.end(xid, flag);@b@    }@b@    catch (XAException e)@b@    {@b@      exception = e;@b@    }@b@@b@    if (exception != null)@b@    {@b@      NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "delist", xaRes, getXAErrorCode(exception), toString()));@b@      return false;@b@    }@b@@b@    if (flag == 33554432)@b@    {@b@      this.suspendedResources.put(xaRes, xid);@b@    }@b@    return true;@b@  }@b@@b@  public boolean enlistResource(XAResource xaRes)@b@    throws RollbackException, IllegalStateException, SystemException@b@  {@b@    if (xaRes == null)@b@    {@b@      return false;@b@    }@b@@b@    if (this.status == 1)@b@    {@b@      throw new RollbackException();@b@    }@b@@b@    if (this.status != 0)@b@    {@b@      throw new IllegalStateException();@b@    }@b@@b@    Xid activeXid = (Xid)this.activeBranches.get(xaRes);@b@    if (activeXid != null)@b@    {@b@      return false;@b@    }@b@@b@    boolean alreadyEnlisted = false;@b@    int flag = 0;@b@@b@    Xid branchXid = (Xid)this.suspendedResources.get(xaRes);@b@@b@    if (branchXid == null)@b@    {@b@      Iterator enlistedIterator = this.enlistedResources.iterator();@b@      while ((!(alreadyEnlisted)) && (enlistedIterator.hasNext()))@b@      {@b@        XAResource resourceManager = (XAResource)enlistedIterator.next();@b@        try@b@        {@b@          if (resourceManager.isSameRM(xaRes))@b@          {@b@            flag = 2097152;@b@            alreadyEnlisted = true;@b@          }@b@        }@b@        catch (XAException e)@b@        {@b@        }@b@      }@b@@b@      branchXid = new XidImpl(this.nextBranchId++, this.xid.getFormatId(), this.xid.getGlobalTransactionId());@b@    }@b@    else@b@    {@b@      alreadyEnlisted = true;@b@      flag = 134217728;@b@      this.suspendedResources.remove(xaRes);@b@    }@b@@b@    if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@    {@b@      NucleusLogger.TRANSACTION.debug(LOCALISER.msg("015039", "enlist", xaRes, getXAFlag(flag), toString()));@b@    }@b@@b@    try@b@    {@b@      xaRes.start(branchXid, flag);@b@    }@b@    catch (XAException e)@b@    {@b@      NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "enlist", xaRes, getXAErrorCode(e), toString()));@b@      return false;@b@    }@b@@b@    if (!(alreadyEnlisted))@b@    {@b@      this.enlistedResources.add(xaRes);@b@    }@b@@b@    this.branches.put(branchXid, xaRes);@b@    this.activeBranches.put(xaRes, branchXid);@b@@b@    return true;@b@  }@b@@b@  public int getStatus() throws SystemException@b@  {@b@    return this.status;@b@  }@b@@b@  public void registerSynchronization(Synchronization sync) throws RollbackException, IllegalStateException, SystemException@b@  {@b@    if (sync == null)@b@    {@b@      return;@b@    }@b@    if (this.status == 1)@b@    {@b@      throw new RollbackException();@b@    }@b@    if (this.status != 0)@b@    {@b@      throw new IllegalStateException();@b@    }@b@    this.synchronization.add(sync);@b@  }@b@@b@  public void rollback() throws IllegalStateException, SystemException@b@  {@b@    if (this.completing)@b@    {@b@      return;@b@    }@b@    try@b@    {@b@      this.completing = true;@b@      if (NucleusLogger.TRANSACTION.isDebugEnabled())@b@      {@b@        NucleusLogger.TRANSACTION.debug("Rolling back " + toString());@b@      }@b@@b@      if ((this.status != 0) && (this.status != 1))@b@      {@b@        throw new IllegalStateException();@b@      }@b@@b@      List failures = null;@b@@b@      Iterator branchKeys = this.branches.keySet().iterator();@b@@b@      this.status = 9;@b@      while (branchKeys.hasNext())@b@      {@b@        Xid xid = (Xid)branchKeys.next();@b@        XAResource resourceManager = (XAResource)this.branches.get(xid);@b@        try@b@        {@b@          resourceManager.rollback(xid);@b@        }@b@        catch (Throwable e)@b@        {@b@          if (failures == null)@b@          {@b@            failures = new ArrayList();@b@          }@b@          failures.add(e);@b@          NucleusLogger.TRANSACTION.error(LOCALISER.msg("015038", "rollback", resourceManager, getXAErrorCode(e), toString()));@b@        }@b@      }@b@      this.status = 4;@b@@b@      Iterator syncIterator = this.synchronization.iterator();@b@      while (syncIterator.hasNext())@b@      {@b@        ((Synchronization)syncIterator.next()).afterCompletion(this.status);@b@      }@b@    }@b@    finally@b@    {@b@      this.completing = false;@b@    }@b@  }@b@@b@  public void setRollbackOnly() throws IllegalStateException, SystemException@b@  {@b@    this.status = 1;@b@  }@b@@b@  public static String getXAErrorCode(Throwable xae)@b@  {@b@    if (!(xae instanceof XAException))@b@    {@b@      return "UNKNOWN";@b@    }@b@    switch (((XAException)xae).errorCode)@b@    {@b@    case 7:@b@      return "XA_HEURCOM";@b@    case 8:@b@      return "XA_HEURHAZ";@b@    case 5:@b@      return "XA_HEURMIX";@b@    case 6:@b@      return "XA_HEURRB";@b@    case 9:@b@      return "XA_NOMIGRATE";@b@    case 100:@b@      return "XA_RBBASE";@b@    case 101:@b@      return "XA_RBCOMMFAIL";@b@    case 102:@b@      return "XA_RBBEADLOCK";@b@    case 107:@b@      return "XA_RBEND";@b@    case 103:@b@      return "XA_RBINTEGRITY";@b@    case 104:@b@      return "XA_RBOTHER";@b@    case 105:@b@      return "XA_RBPROTO";@b@    case 106:@b@      return "XA_RBTIMEOUT";@b@    case 3:@b@      return "XA_RDONLY";@b@    case 4:@b@      return "XA_RETRY";@b@    case -2:@b@      return "XAER_ASYNC";@b@    case -8:@b@      return "XAER_DUPID";@b@    case -5:@b@      return "XAER_INVAL";@b@    case -4:@b@      return "XAER_NOTA";@b@    case -9:@b@      return "XAER_OUTSIDE";@b@    case -6:@b@      return "XAER_PROTO";@b@    case -3:@b@      return "XAER_RMERR";@b@    case -7:@b@      return "XAER_RMFAIL";@b@    }@b@    return "UNKNOWN";@b@  }@b@@b@  private static String getXAFlag(int flag)@b@  {@b@    switch (flag)@b@    {@b@    case 8388608:@b@      return "TMENDRSCAN";@b@    case 536870912:@b@      return "TMFAIL";@b@    case 2097152:@b@      return "TMJOIN";@b@    case 0:@b@      return "TMNOFLAGS";@b@    case 1073741824:@b@      return "TMONEPHASE";@b@    case 134217728:@b@      return "TMRESUME";@b@    case 16777216:@b@      return "TMSTARTRSCAN";@b@    case 67108864:@b@      return "TMSUCCESS";@b@    case 33554432:@b@      return "TMSUSPEND";@b@    }@b@    return "UNKNOWN";@b@  }@b@@b@  public String toString()@b@  {@b@    return "[DataNucleus Transaction, ID=" + this.xid.toString() + ", enlisted resources=" + this.enlistedResources.toString() + "]";@b@  }@b@}

2.TransactionManager事务管理类

package org.datanucleus.transaction;@b@@b@import java.util.Hashtable;@b@import org.datanucleus.management.ManagementServer;@b@import org.datanucleus.management.runtime.TransactionRuntime;@b@import org.datanucleus.util.ClassUtils;@b@@b@public class TransactionManager@b@{@b@  private boolean containerManagedConnections;@b@  Hashtable<Object, Transaction> transactions;@b@  private TransactionRuntime txRuntime;@b@@b@  public TransactionManager()@b@  {@b@    this.containerManagedConnections = false;@b@@b@    this.transactions = new Hashtable();@b@@b@    this.txRuntime = null;@b@  }@b@@b@  public void setContainerManagedConnections(boolean flag) {@b@    this.containerManagedConnections = flag;@b@  }@b@@b@  public void registerMbean(String domainName, String instanceName, ManagementServer mgmtServer)@b@  {@b@    if (mgmtServer != null)@b@    {@b@      this.txRuntime = new TransactionRuntime();@b@      String mbeanName = domainName + ":InstanceName=" + instanceName + ",Type=" + ClassUtils.getClassNameForClass(this.txRuntime.getClass()) + ",Name=TransactionRuntime";@b@@b@      mgmtServer.registerMBean(this.txRuntime, mbeanName);@b@    }@b@  }@b@@b@  public TransactionRuntime getTransactionRuntime()@b@  {@b@    return this.txRuntime;@b@  }@b@@b@  public void begin(Object om)@b@  {@b@    Transaction tx = (Transaction)this.transactions.get(om);@b@    if (tx != null)@b@    {@b@      throw new NucleusTransactionException("Invalid state. Transaction has already started");@b@    }@b@    tx = new Transaction();@b@    this.transactions.put(om, tx);@b@  }@b@@b@  public void commit(Object om)@b@  {@b@    Transaction tx = (Transaction)this.transactions.get(om);@b@    if (tx == null)@b@    {@b@      throw new NucleusTransactionException("Invalid state. Transaction does not exist");@b@    }@b@    try@b@    {@b@      if (!(this.containerManagedConnections))@b@      {@b@        tx.commit();@b@      }@b@    }@b@    finally@b@    {@b@      this.transactions.remove(om);@b@    }@b@  }@b@@b@  public Transaction getTransaction(Object om)@b@  {@b@    if (om == null)@b@    {@b@      return null;@b@    }@b@    return ((Transaction)this.transactions.get(om));@b@  }@b@@b@  public void resume(Object om, Transaction tx)@b@  {@b@    throw new UnsupportedOperationException();@b@  }@b@@b@  public void rollback(Object om)@b@  {@b@    Transaction tx = (Transaction)this.transactions.get(om);@b@    if (tx == null)@b@    {@b@      throw new NucleusTransactionException("Invalid state. Transaction does not exist");@b@    }@b@    try@b@    {@b@      if (!(this.containerManagedConnections))@b@      {@b@        tx.rollback();@b@      }@b@    }@b@    finally@b@    {@b@      this.transactions.remove(om);@b@    }@b@  }@b@@b@  public void setRollbackOnly(Object om)@b@  {@b@    Transaction tx = (Transaction)this.transactions.get(om);@b@    if (tx == null)@b@    {@b@      throw new NucleusTransactionException("Invalid state. Transaction does not exist");@b@    }@b@    tx.setRollbackOnly();@b@  }@b@@b@  public void setTransactionTimeout(Object om, int millis)@b@  {@b@    throw new UnsupportedOperationException();@b@  }@b@@b@  public Transaction suspend(Object om)@b@  {@b@    throw new UnsupportedOperationException();@b@  }@b@}

3.XidImpl实现类(实现javax.transaction.xa.Xid接口定义)

package org.datanucleus.transaction;@b@@b@import javax.transaction.xa.Xid;@b@@b@public class XidImpl@b@  implements Xid@b@{@b@  byte[] branchQualifierBytes;@b@  int formatId;@b@  byte[] globalTransactionIdBytes;@b@@b@  public XidImpl(int branchQualifierBytes, int formatId, byte[] globalTransactionIdBytes)@b@  {@b@    byte[] buf = new byte[4];@b@    buf[0] = (byte)(branchQualifierBytes >>> 24 & 0xFF);@b@    buf[1] = (byte)(branchQualifierBytes >>> 16 & 0xFF);@b@    buf[2] = (byte)(branchQualifierBytes >>> 8 & 0xFF);@b@    buf[3] = (byte)(branchQualifierBytes & 0xFF);@b@    this.branchQualifierBytes = buf;@b@    this.formatId = formatId;@b@    this.globalTransactionIdBytes = globalTransactionIdBytes;@b@  }@b@@b@  public XidImpl(int branchQualifierBytes, int formatId, int globalTransactionIdBytes)@b@  {@b@    byte[] buf = new byte[4];@b@    buf[0] = (byte)(branchQualifierBytes >>> 24 & 0xFF);@b@    buf[1] = (byte)(branchQualifierBytes >>> 16 & 0xFF);@b@    buf[2] = (byte)(branchQualifierBytes >>> 8 & 0xFF);@b@    buf[3] = (byte)(branchQualifierBytes & 0xFF);@b@    this.branchQualifierBytes = buf;@b@    this.formatId = formatId;@b@    buf = new byte[4];@b@    buf[0] = (byte)(globalTransactionIdBytes >>> 24 & 0xFF);@b@    buf[1] = (byte)(globalTransactionIdBytes >>> 16 & 0xFF);@b@    buf[2] = (byte)(globalTransactionIdBytes >>> 8 & 0xFF);@b@    buf[3] = (byte)(globalTransactionIdBytes & 0xFF);@b@    this.globalTransactionIdBytes = buf;@b@  }@b@@b@  public byte[] getBranchQualifier()@b@  {@b@    return this.branchQualifierBytes;@b@  }@b@@b@  public int getFormatId()@b@  {@b@    return this.formatId;@b@  }@b@@b@  public byte[] getGlobalTransactionId()@b@  {@b@    return this.globalTransactionIdBytes;@b@  }@b@@b@  public String toString()@b@  {@b@    return "Xid=" + new String(this.globalTransactionIdBytes);@b@  }@b@}