一、前言
基于accumulo-server的(1.5.2)包版本,其中基于zookeeper实现主服务结点Master的设计实现,主要代码org.apache.accumulo.server.master.Master、org.apache.accumulo.server.zookeeper.ZooLock、org.apache.accumulo.server.master.recovery.RecoveryManager等
二、源代码说明
1.Master代码
package org.apache.accumulo.server.master;@b@@b@import java.io.IOException;@b@import java.net.InetSocketAddress;@b@import java.nio.ByteBuffer;@b@import java.util.ArrayList;@b@import java.util.Collection;@b@import java.util.Collections;@b@import java.util.HashMap;@b@import java.util.HashSet;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import java.util.Set;@b@import java.util.SortedMap;@b@import java.util.TreeMap;@b@import java.util.TreeSet;@b@import java.util.concurrent.CountDownLatch;@b@import java.util.concurrent.atomic.AtomicBoolean;@b@import java.util.concurrent.atomic.AtomicInteger;@b@import org.apache.accumulo.core.Constants;@b@import org.apache.accumulo.core.client.AccumuloException;@b@import org.apache.accumulo.core.client.AccumuloSecurityException;@b@import org.apache.accumulo.core.client.BatchWriter;@b@import org.apache.accumulo.core.client.BatchWriterConfig;@b@import org.apache.accumulo.core.client.Connector;@b@import org.apache.accumulo.core.client.Instance;@b@import org.apache.accumulo.core.client.IsolatedScanner;@b@import org.apache.accumulo.core.client.MutationsRejectedException;@b@import org.apache.accumulo.core.client.RowIterator;@b@import org.apache.accumulo.core.client.Scanner;@b@import org.apache.accumulo.core.client.TableNotFoundException;@b@import org.apache.accumulo.core.client.admin.TimeType;@b@import org.apache.accumulo.core.client.impl.Tables;@b@import org.apache.accumulo.core.client.impl.ThriftTransportPool;@b@import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;@b@import org.apache.accumulo.core.client.impl.thrift.TableOperation;@b@import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;@b@import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;@b@import org.apache.accumulo.core.conf.AccumuloConfiguration;@b@import org.apache.accumulo.core.conf.Property;@b@import org.apache.accumulo.core.data.Key;@b@import org.apache.accumulo.core.data.KeyExtent;@b@import org.apache.accumulo.core.data.Mutation;@b@import org.apache.accumulo.core.data.PartialKey;@b@import org.apache.accumulo.core.data.Range;@b@import org.apache.accumulo.core.data.Value;@b@import org.apache.accumulo.core.data.thrift.TKeyExtent;@b@import org.apache.accumulo.core.file.FileUtil;@b@import org.apache.accumulo.core.iterators.IteratorUtil;@b@import org.apache.accumulo.core.master.state.tables.TableState;@b@import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;@b@import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;@b@import org.apache.accumulo.core.master.thrift.MasterGoalState;@b@import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;@b@import org.apache.accumulo.core.master.thrift.MasterState;@b@import org.apache.accumulo.core.master.thrift.TableInfo;@b@import org.apache.accumulo.core.master.thrift.TableOperation;@b@import org.apache.accumulo.core.master.thrift.TabletLoadState;@b@import org.apache.accumulo.core.master.thrift.TabletServerStatus;@b@import org.apache.accumulo.core.master.thrift.TabletSplit;@b@import org.apache.accumulo.core.security.SecurityUtil;@b@import org.apache.accumulo.core.security.thrift.TCredentials;@b@import org.apache.accumulo.core.util.AddressUtil;@b@import org.apache.accumulo.core.util.ByteBufferUtil;@b@import org.apache.accumulo.core.util.CachedConfiguration;@b@import org.apache.accumulo.core.util.ColumnFQ;@b@import org.apache.accumulo.core.util.Daemon;@b@import org.apache.accumulo.core.util.UtilWaitThread;@b@import org.apache.accumulo.core.zookeeper.ZooUtil;@b@import org.apache.accumulo.fate.AgeOffStore;@b@import org.apache.accumulo.fate.Fate;@b@import org.apache.accumulo.fate.ReadOnlyTStore.TStatus;@b@import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;@b@import org.apache.accumulo.fate.zookeeper.ZooLock.AsyncLockWatcher;@b@import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;@b@import org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator;@b@import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;@b@import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;@b@import org.apache.accumulo.server.Accumulo;@b@import org.apache.accumulo.server.client.HdfsZooInstance;@b@import org.apache.accumulo.server.conf.ServerConfiguration;@b@import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;@b@import org.apache.accumulo.server.master.balancer.TabletBalancer;@b@import org.apache.accumulo.server.master.recovery.RecoveryManager;@b@import org.apache.accumulo.server.master.state.Assignment;@b@import org.apache.accumulo.server.master.state.CurrentState;@b@import org.apache.accumulo.server.master.state.DeadServerList;@b@import org.apache.accumulo.server.master.state.DistributedStoreException;@b@import org.apache.accumulo.server.master.state.MergeInfo;@b@import org.apache.accumulo.server.master.state.MergeInfo.Operation;@b@import org.apache.accumulo.server.master.state.MergeState;@b@import org.apache.accumulo.server.master.state.MergeStats;@b@import org.apache.accumulo.server.master.state.MetaDataStateStore;@b@import org.apache.accumulo.server.master.state.RootTabletStateStore;@b@import org.apache.accumulo.server.master.state.TServerInstance;@b@import org.apache.accumulo.server.master.state.TableCounts;@b@import org.apache.accumulo.server.master.state.TableStats;@b@import org.apache.accumulo.server.master.state.TabletLocationState;@b@import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;@b@import org.apache.accumulo.server.master.state.TabletMigration;@b@import org.apache.accumulo.server.master.state.TabletServerState;@b@import org.apache.accumulo.server.master.state.TabletState;@b@import org.apache.accumulo.server.master.state.TabletStateStore;@b@import org.apache.accumulo.server.master.state.ZooTabletStateStore;@b@import org.apache.accumulo.server.master.state.tables.TableManager;@b@import org.apache.accumulo.server.master.state.tables.TableObserver;@b@import org.apache.accumulo.server.master.tableOps.BulkImport;@b@import org.apache.accumulo.server.master.tableOps.CancelCompactions;@b@import org.apache.accumulo.server.master.tableOps.ChangeTableState;@b@import org.apache.accumulo.server.master.tableOps.CloneTable;@b@import org.apache.accumulo.server.master.tableOps.CompactRange;@b@import org.apache.accumulo.server.master.tableOps.CreateTable;@b@import org.apache.accumulo.server.master.tableOps.DeleteTable;@b@import org.apache.accumulo.server.master.tableOps.ExportTable;@b@import org.apache.accumulo.server.master.tableOps.ImportTable;@b@import org.apache.accumulo.server.master.tableOps.RenameTable;@b@import org.apache.accumulo.server.master.tableOps.TableRangeOp;@b@import org.apache.accumulo.server.master.tableOps.TraceRepo;@b@import org.apache.accumulo.server.master.tserverOps.ShutdownTServer;@b@import org.apache.accumulo.server.monitor.Monitor;@b@import org.apache.accumulo.server.security.AuditedSecurityOperation;@b@import org.apache.accumulo.server.security.SecurityConstants;@b@import org.apache.accumulo.server.security.SecurityOperation;@b@import org.apache.accumulo.server.tabletserver.TabletTime;@b@import org.apache.accumulo.server.trace.TraceFileSystem;@b@import org.apache.accumulo.server.util.AddressUtil;@b@import org.apache.accumulo.server.util.DefaultMap;@b@import org.apache.accumulo.server.util.Halt;@b@import org.apache.accumulo.server.util.MetadataTable;@b@import org.apache.accumulo.server.util.SystemPropUtil;@b@import org.apache.accumulo.server.util.TServerUtils;@b@import org.apache.accumulo.server.util.TServerUtils.ServerPort;@b@import org.apache.accumulo.server.util.TablePropUtil;@b@import org.apache.accumulo.server.util.TabletIterator.TabletDeletedException;@b@import org.apache.accumulo.server.util.time.SimpleTimer;@b@import org.apache.accumulo.server.zookeeper.ZooLock;@b@import org.apache.accumulo.server.zookeeper.ZooReaderWriter;@b@import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;@b@import org.apache.accumulo.trace.instrument.thrift.TraceWrap;@b@import org.apache.accumulo.trace.thrift.TInfo;@b@import org.apache.hadoop.fs.FileSystem;@b@import org.apache.hadoop.io.DataInputBuffer;@b@import org.apache.hadoop.io.DataOutputBuffer;@b@import org.apache.hadoop.io.Text;@b@import org.apache.log4j.Logger;@b@import org.apache.thrift.TException;@b@import org.apache.thrift.server.TServer;@b@import org.apache.thrift.transport.TTransportException;@b@import org.apache.zookeeper.KeeperException;@b@import org.apache.zookeeper.KeeperException.NoNodeException;@b@import org.apache.zookeeper.WatchedEvent;@b@import org.apache.zookeeper.Watcher;@b@import org.apache.zookeeper.data.Stat;@b@@b@public class Master@b@ implements LiveTServerSet.Listener, TableObserver, CurrentState@b@{@b@ private static final Logger log = Logger.getLogger(Master.class);@b@ private static final int ONE_SECOND = 1000;@b@ private static final Text METADATA_TABLE_ID = new Text("!0");@b@ private static final long TIME_TO_WAIT_BETWEEN_SCANS = 60000L;@b@ private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 300000L;@b@ private static final long WAIT_BETWEEN_ERRORS = 1000L;@b@ private static final long DEFAULT_WAIT_FOR_WATCHER = 10000L;@b@ private static final int MAX_CLEANUP_WAIT_TIME = 1000;@b@ private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = 1000;@b@ private static final int MAX_TSERVER_WORK_CHUNK = 5000;@b@ private static final int MAX_BAD_STATUS_COUNT = 3;@b@ private final FileSystem fs;@b@ private final Instance instance;@b@ private final java.lang.String hostname;@b@ private final LiveTServerSet tserverSet;@b@ private final List<TabletGroupWatcher> watchers = new ArrayList();@b@ private final SecurityOperation security;@b@ private final Map<TServerInstance, AtomicInteger> badServers = Collections.synchronizedMap(new DefaultMap(new AtomicInteger()));@b@ private final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet());@b@ private final SortedMap<KeyExtent, TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap());@b@ private final EventCoordinator nextEvent = new EventCoordinator();@b@ private final Object mergeLock = new Object();@b@ private RecoveryManager recoveryManager = null;@b@ private ZooLock masterLock = null;@b@ private TServer clientService = null;@b@ private TabletBalancer tabletBalancer;@b@ private MasterState state = MasterState.INITIAL;@b@ private Fate<Master> fate;@b@ private volatile SortedMap<TServerInstance, TabletServerStatus> tserverStatus = Collections.unmodifiableSortedMap(new TreeMap());@b@ private final Set<java.lang.String> recoveriesInProgress = Collections.synchronizedSet(new HashSet());@b@ static final boolean X = 1;@b@ static final boolean _ = 0;@b@ static final boolean[][] transitionOK = { { true, true, false, false, false, false, true }, { false, true, true, true, false, false, true }, { false, false, true, true, true, false, true }, { false, false, true, true, true, false, true }, { false, false, true, true, true, true, true }, { false, false, false, true, false, true, true }, { false, false, false, false, false, false, true } };@b@ private boolean haveUpgradedZooKeeper = false;@b@ private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);@b@ private final CountDownLatch waitForMetadataUpgrade = new CountDownLatch(1);@b@ private final ServerConfiguration serverConfig;@b@ static final java.lang.String I_DONT_KNOW_WHY = "unexpected failure";@b@@b@ private synchronized MasterState getMasterState()@b@ {@b@ return this.state;@b@ }@b@@b@ public boolean stillMaster() {@b@ return (getMasterState() != MasterState.STOP);@b@ }@b@@b@ private synchronized void setMasterState(MasterState newState)@b@ {@b@ if (this.state.equals(newState))@b@ return;@b@ if (transitionOK[this.state.ordinal()][newState.ordinal()] == 0)@b@ log.error("Programmer error: master should not transition from " + this.state + " to " + newState);@b@@b@ MasterState oldState = this.state;@b@ this.state = newState;@b@ this.nextEvent.event("State changed from %s to %s", new Object[] { oldState, newState });@b@ if (newState == MasterState.STOP)@b@ {@b@ SimpleTimer.getInstance().schedule(new Runnable(this)@b@ {@b@ public void run()@b@ {@b@ Master.access$000(this.this$0).stop();@b@ Master.access$100(this.this$0).event("stopped event loop", new Object[0]);@b@ }@b@ }@b@ , 100L, 1000L);@b@ }@b@@b@ if ((oldState != newState) && (newState == MasterState.HAVE_LOCK)) {@b@ upgradeZookeeper();@b@ }@b@@b@ if ((oldState != newState) && (newState == MasterState.NORMAL))@b@ upgradeMetadata();@b@ }@b@@b@ private void upgradeZookeeper()@b@ {@b@ if (Accumulo.getAccumuloPersistentVersion(this.fs) == 4)@b@ {@b@ if (null != this.fate)@b@ throw new IllegalStateException("Access to Fate should not have been initialized prior to the Master transitioning to active. Please save all logs and file a bug.");@b@@b@ Accumulo.abortIfFateTransactions();@b@ try {@b@ log.info("Upgrading zookeeper");@b@@b@ IZooReaderWriter zoo = ZooReaderWriter.getInstance();@b@@b@ zoo.recursiveDelete(ZooUtil.getRoot(this.instance) + "/loggers", ZooUtil.NodeMissingPolicy.SKIP);@b@ zoo.recursiveDelete(ZooUtil.getRoot(this.instance) + "/dead/loggers", ZooUtil.NodeMissingPolicy.SKIP);@b@@b@ zoo.putPersistentData(ZooUtil.getRoot(this.instance) + "/recovery", new byte[] { 48 }, ZooUtil.NodeExistsPolicy.SKIP);@b@@b@ for (java.lang.String id : Tables.getIdToNameMap(this.instance).keySet())@b@ {@b@ zoo.putPersistentData(ZooUtil.getRoot(this.instance) + "/tables" + "/" + id + "/compact-cancel-id", "0".getBytes(Constants.UTF8), ZooUtil.NodeExistsPolicy.SKIP);@b@ }@b@@b@ this.haveUpgradedZooKeeper = true;@b@ } catch (Exception ex) {@b@ log.fatal("Error performing upgrade", ex);@b@ System.exit(1);@b@ }@b@ }@b@ }@b@@b@ private void upgradeMetadata()@b@ {@b@ if (this.upgradeMetadataRunning.compareAndSet(false, true))@b@ if (Accumulo.getAccumuloPersistentVersion(this.fs) == 4)@b@ {@b@ if (!(this.haveUpgradedZooKeeper))@b@ throw new IllegalStateException("We should only attempt to upgrade Accumulo's !METADATA table if we've already upgraded ZooKeeper. Please save all logs and file a bug.");@b@@b@ if (null != this.fate)@b@ throw new IllegalStateException("Access to Fate should not have been initialized prior to the Master finishing upgrades. Please save all logs and file a bug.");@b@@b@ Runnable upgradeTask = new Runnable(this)@b@ {@b@ public void run() {@b@ try {@b@ Master.access$200().info("Starting to upgrade !METADATA table.");@b@ MetadataTable.moveMetaDeleteMarkers(Master.access$300(this.this$0), SecurityConstants.getSystemCredentials());@b@ Master.access$200().info("Updating persistent data version.");@b@ Accumulo.updateAccumuloVersion(Master.access$400(this.this$0));@b@ Master.access$200().info("Upgrade complete");@b@ Master.access$500(this.this$0).countDown();@b@ } catch (Exception ex) {@b@ Master.access$200().fatal("Error performing upgrade", ex);@b@ System.exit(1);@b@ }@b@@b@ }@b@@b@ };@b@ new Thread(upgradeTask).start();@b@ } else {@b@ this.waitForMetadataUpgrade.countDown();@b@ }@b@ }@b@@b@ private int assignedOrHosted(Text tableId)@b@ {@b@ int result = 0;@b@ for (TabletGroupWatcher watcher : this.watchers) {@b@ TableCounts count = watcher.getStats(tableId);@b@ result += count.hosted() + count.assigned();@b@ }@b@ return result;@b@ }@b@@b@ private int totalAssignedOrHosted() {@b@ int result = 0;@b@ for (TabletGroupWatcher watcher : this.watchers)@b@ for (TableCounts counts : watcher.getStats().values())@b@ result += counts.assigned() + counts.hosted();@b@@b@@b@ return result;@b@ }@b@@b@ private int nonMetaDataTabletsAssignedOrHosted() {@b@ return (totalAssignedOrHosted() - assignedOrHosted(new Text("!0")));@b@ }@b@@b@ private int notHosted() {@b@ int result = 0;@b@ for (TabletGroupWatcher watcher : this.watchers)@b@ for (TableCounts counts : watcher.getStats().values())@b@ result += counts.assigned() + counts.assignedToDeadServers();@b@@b@@b@ return result;@b@ }@b@@b@ private int displayUnassigned()@b@ {@b@ Iterator i$;@b@ TabletGroupWatcher watcher;@b@ TableManager manager;@b@ int result = 0;@b@ Text meta = new Text("!0");@b@ switch (5.$SwitchMap$org$apache$accumulo$core$master$thrift$MasterState[getMasterState().ordinal()])@b@ {@b@ case 1:@b@ for (i$ = this.watchers.iterator(); i$.hasNext(); ) { watcher = (TabletGroupWatcher)i$.next();@b@ manager = TableManager.getInstance();@b@ for (Map.Entry entry : watcher.getStats().entrySet()) {@b@ Text tableId = (Text)entry.getKey();@b@ TableCounts counts = (TableCounts)entry.getValue();@b@ TableState tableState = manager.getTableState(tableId.toString());@b@ if ((tableState != null) && (tableState.equals(TableState.ONLINE)))@b@ result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned();@b@ }@b@ }@b@@b@ break;@b@ case 2:@b@ for (i$ = this.watchers.iterator(); i$.hasNext(); ) { watcher = (TabletGroupWatcher)i$.next();@b@ result += watcher.getStats(meta).unassigned();@b@ }@b@ break;@b@ case 3:@b@ case 4:@b@ for (i$ = this.watchers.iterator(); i$.hasNext(); ) { watcher = (TabletGroupWatcher)i$.next();@b@ result += watcher.getStats(meta).unassigned();@b@ }@b@@b@ }@b@@b@ return result;@b@ }@b@@b@ private void checkNotMetadataTable(java.lang.String tableName, TableOperation operation) throws org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException {@b@ if (tableName.compareTo("!METADATA") == 0) {@b@ java.lang.String why = "Table names cannot be == !METADATA";@b@ log.warn(why);@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.OTHER, why);@b@ }@b@ }@b@@b@ private void checkTableName(java.lang.String tableName, TableOperation operation) throws org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException {@b@ java.lang.String why;@b@ if (!(tableName.matches("^\\w+$"))) {@b@ why = "Table names must only contain word characters (letters, digits, and underscores): " + tableName;@b@ log.warn(why);@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.OTHER, why);@b@ }@b@ if (Tables.getNameToIdMap(HdfsZooInstance.getInstance()).containsKey(tableName)) {@b@ why = "Table name already exists: " + tableName;@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.EXISTS, why);@b@ }@b@ }@b@@b@ public void mustBeOnline(java.lang.String tableId) throws org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException@b@ {@b@ Tables.clearCache(this.instance);@b@ if (!(Tables.getTableState(this.instance, tableId).equals(TableState.ONLINE)))@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(tableId, null, TableOperation.MERGE, TableOperationExceptionType.OFFLINE, "table is not online");@b@ }@b@@b@ public Connector getConnector() throws AccumuloException, AccumuloSecurityException {@b@ return this.instance.getConnector("!SYSTEM", SecurityConstants.getSystemToken());@b@ }@b@@b@ private void waitAround(EventCoordinator.Listener listener) {@b@ listener.waitForEvents(1000L);@b@ }@b@@b@ public static <T> T createInstanceFromPropertyName(AccumuloConfiguration conf, Property property, Class<T> base, T defaultInstance)@b@ {@b@ java.lang.String clazzName = conf.get(property);@b@ Object instance = null;@b@ try@b@ {@b@ Class clazz = AccumuloVFSClassLoader.loadClass(clazzName, base);@b@ instance = clazz.newInstance();@b@ log.info("Loaded class : " + clazzName);@b@ } catch (Exception e) {@b@ log.warn("Failed to load class ", e);@b@ }@b@@b@ if (instance == null) {@b@ log.info("Using " + defaultInstance.getClass().getName());@b@ instance = defaultInstance;@b@ }@b@ return instance;@b@ }@b@@b@ public Master(ServerConfiguration config, FileSystem fs, java.lang.String hostname) throws IOException {@b@ this.serverConfig = config;@b@ this.instance = config.getInstance();@b@ this.fs = TraceFileSystem.wrap(fs);@b@ this.hostname = hostname;@b@@b@ AccumuloConfiguration aconf = this.serverConfig.getConfiguration();@b@@b@ log.info("Version 1.5.2");@b@ log.info("Instance " + this.instance.getInstanceID());@b@ ThriftTransportPool.getInstance().setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));@b@ this.security = AuditedSecurityOperation.getInstance();@b@ this.tserverSet = new LiveTServerSet(this.instance, config.getConfiguration(), this);@b@ this.tabletBalancer = ((TabletBalancer)createInstanceFromPropertyName(aconf, Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer()));@b@ this.tabletBalancer.init(this.serverConfig);@b@ }@b@@b@ public LiveTServerSet.TServerConnection getConnection(TServerInstance server) {@b@ try {@b@ return this.tserverSet.getConnection(server); } catch (TException ex) {@b@ }@b@ return null;@b@ }@b@@b@ public MergeInfo getMergeInfo(KeyExtent tablet)@b@ {@b@ if (tablet.isRootTablet())@b@ return new MergeInfo();@b@ return getMergeInfo(tablet.getTableId());@b@ }@b@@b@ public MergeInfo getMergeInfo(Text tableId) {@b@ Object localObject1;@b@ monitorenter;@b@ try {@b@ java.lang.String path = ZooUtil.getRoot(this.instance.getInstanceID()) + "/tables" + "/" + tableId.toString() + "/merge";@b@ if (!(ZooReaderWriter.getInstance().exists(path)))@b@ return new MergeInfo();@b@ byte[] data = ZooReaderWriter.getInstance().getData(path, new Stat());@b@ DataInputBuffer in = new DataInputBuffer();@b@ in.reset(data, data.length);@b@ MergeInfo info = new MergeInfo();@b@ info.readFields(in);@b@ return info;@b@ } catch (KeeperException.NoNodeException ex) {@b@ log.info("Error reading merge state, it probably just finished");@b@ return new MergeInfo();@b@ } catch (Exception ex) {@b@ log.warn("Unexpected error reading merge state", ex);@b@ return new MergeInfo();@b@ } finally {@b@ monitorexit; }@b@ }@b@@b@ public void setMergeState(MergeInfo info, MergeState state) throws IOException, KeeperException, InterruptedException {@b@ synchronized (this.mergeLock) {@b@ java.lang.String path = ZooUtil.getRoot(this.instance.getInstanceID()) + "/tables" + "/" + info.getRange().getTableId().toString() + "/merge";@b@ info.setState(state);@b@ if (state.equals(MergeState.NONE)) {@b@ ZooReaderWriter.getInstance().recursiveDelete(path, ZooUtil.NodeMissingPolicy.SKIP);@b@ } else {@b@ DataOutputBuffer out = new DataOutputBuffer();@b@ try {@b@ info.write(out);@b@ } catch (IOException ex) {@b@ throw new RuntimeException("Unlikely", ex);@b@ }@b@ ZooReaderWriter.getInstance().putPersistentData(path, out.getData(), (state.equals(MergeState.STARTED)) ? ZooUtil.NodeExistsPolicy.FAIL : ZooUtil.NodeExistsPolicy.OVERWRITE);@b@ }@b@@b@ this.mergeLock.notifyAll();@b@ }@b@ this.nextEvent.event("Merge state of %s set to %s", new Object[] { info.getRange(), state });@b@ }@b@@b@ public void clearMergeState(Text tableId) throws IOException, KeeperException, InterruptedException {@b@ synchronized (this.mergeLock) {@b@ java.lang.String path = ZooUtil.getRoot(this.instance.getInstanceID()) + "/tables" + "/" + tableId.toString() + "/merge";@b@ ZooReaderWriter.getInstance().recursiveDelete(path, ZooUtil.NodeMissingPolicy.SKIP);@b@ this.mergeLock.notifyAll();@b@ }@b@ this.nextEvent.event("Merge state of %s cleared", new Object[] { tableId });@b@ }@b@@b@ private void setMasterGoalState(MasterGoalState state) {@b@ try {@b@ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(this.instance) + "/masters/goal_state", state.name().getBytes(Constants.UTF8), ZooUtil.NodeExistsPolicy.OVERWRITE);@b@ }@b@ catch (Exception ex) {@b@ log.error("Unable to set master goal state in zookeeper");@b@ }@b@ }@b@@b@ MasterGoalState getMasterGoalState() {@b@ byte[] data;@b@ try {@b@ data = ZooReaderWriter.getInstance().getData(ZooUtil.getRoot(this.instance) + "/masters/goal_state", null);@b@ return MasterGoalState.valueOf(new java.lang.String(data, Constants.UTF8));@b@ } catch (Exception e) {@b@ log.error("Problem getting real goal state: " + e);@b@ UtilWaitThread.sleep(1000L);@b@ }@b@ }@b@@b@ private void shutdown(boolean stopTabletServers) {@b@ if (stopTabletServers) {@b@ setMasterGoalState(MasterGoalState.CLEAN_STOP);@b@ EventCoordinator.Listener eventListener = this.nextEvent.getListener();@b@ do@b@ waitAround(eventListener);@b@ while (this.tserverSet.size() > 0);@b@ }@b@ setMasterState(MasterState.STOP);@b@ }@b@@b@ public boolean hasCycled(long time) {@b@ for (TabletGroupWatcher watcher : this.watchers)@b@ if (watcher.stats.lastScanFinished() < time)@b@ return false;@b@@b@@b@ return true;@b@ }@b@@b@ public void clearMigrations(java.lang.String tableId) {@b@ synchronized (this.migrations) {@b@ Iterator iterator = this.migrations.keySet().iterator();@b@ while (iterator.hasNext()) {@b@ KeyExtent extent = (KeyExtent)iterator.next();@b@ if (extent.getTableId().toString().equals(tableId))@b@ iterator.remove();@b@ }@b@ }@b@ }@b@@b@ TabletGoalState getSystemGoalState(TabletLocationState tls)@b@ {@b@ switch (5.$SwitchMap$org$apache$accumulo$core$master$thrift$MasterState[getMasterState().ordinal()])@b@ {@b@ case 1:@b@ return TabletGoalState.HOSTED;@b@ case 2:@b@ case 5:@b@ case 6:@b@ if (tls.extent.isMeta())@b@ return TabletGoalState.HOSTED;@b@ return TabletGoalState.UNASSIGNED;@b@ case 3:@b@ if (tls.extent.isRootTablet())@b@ return TabletGoalState.HOSTED;@b@ return TabletGoalState.UNASSIGNED;@b@ case 4:@b@ return TabletGoalState.UNASSIGNED;@b@ case 7:@b@ return TabletGoalState.UNASSIGNED;@b@ }@b@@b@ return TabletGoalState.HOSTED;@b@ }@b@@b@ TabletGoalState getTableGoalState(KeyExtent extent) {@b@ TableState tableState = TableManager.getInstance().getTableState(extent.getTableId().toString());@b@ if (tableState == null)@b@ return TabletGoalState.DELETED;@b@ switch (5.$SwitchMap$org$apache$accumulo$core$master$state$tables$TableState[tableState.ordinal()])@b@ {@b@ case 1:@b@ return TabletGoalState.DELETED;@b@ case 2:@b@ case 3:@b@ return TabletGoalState.UNASSIGNED;@b@ }@b@ return TabletGoalState.HOSTED;@b@ }@b@@b@ TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo)@b@ {@b@ KeyExtent extent = tls.extent;@b@@b@ TabletGoalState state = getSystemGoalState(tls);@b@ if (state == TabletGoalState.HOSTED) {@b@ if ((tls.current != null) && (this.serversToShutdown.contains(tls.current))) {@b@ return TabletGoalState.UNASSIGNED;@b@ }@b@@b@ if (mergeInfo.getRange() != null) {@b@ log.debug("mergeInfo overlaps: " + extent + " " + mergeInfo.overlaps(extent));@b@ if (mergeInfo.overlaps(extent))@b@ switch (5.$SwitchMap$org$apache$accumulo$server$master$state$MergeState[mergeInfo.getState().ordinal()])@b@ {@b@ case 1:@b@ case 2:@b@ break;@b@ case 3:@b@ case 4:@b@ return TabletGoalState.HOSTED;@b@ case 5:@b@ if (tls.getState(onlineTabletServers()).equals(TabletState.HOSTED)) {@b@ if (!(tls.chopped)) break label214;@b@ return TabletGoalState.UNASSIGNED;@b@ }@b@ if ((tls.chopped) && (tls.walogs.isEmpty())) {@b@ return TabletGoalState.UNASSIGNED;@b@ }@b@@b@ return TabletGoalState.HOSTED;@b@ case 6:@b@ case 7:@b@ label214: return TabletGoalState.UNASSIGNED;@b@ }@b@@b@@b@ }@b@@b@ state = getTableGoalState(extent);@b@ if (state == TabletGoalState.HOSTED)@b@ {@b@ TServerInstance dest = (TServerInstance)this.migrations.get(extent);@b@ if ((dest != null) && (tls.current != null) && (!(dest.equals(tls.current))))@b@ return TabletGoalState.UNASSIGNED;@b@ }@b@ }@b@@b@ return state;@b@ }@b@@b@ private SortedMap<TServerInstance, TabletServerStatus> gatherTableInformation()@b@ {@b@ long start = System.currentTimeMillis();@b@ SortedMap result = new TreeMap();@b@ Set currentServers = this.tserverSet.getCurrentServers();@b@ for (??? = currentServers.iterator(); ???.hasNext(); ) { TServerInstance server = (TServerInstance)???.next();@b@ try {@b@ Thread t = Thread.currentThread();@b@ java.lang.String oldName = t.getName();@b@ try {@b@ t.setName("Getting status from " + server);@b@ LiveTServerSet.TServerConnection connection = this.tserverSet.getConnection(server);@b@ if (connection == null)@b@ throw new IOException("No connection to " + server);@b@ TabletServerStatus status = connection.getTableMap(false);@b@ result.put(server, status);@b@ } finally {@b@ t.setName(oldName);@b@ }@b@ } catch (Exception ex) {@b@ log.error("unable to get tablet server status " + server + " " + ex.toString());@b@ log.debug("unable to get tablet server status " + server, ex);@b@ if (((AtomicInteger)this.badServers.get(server)).incrementAndGet() > 3) {@b@ log.warn("attempting to stop " + server);@b@ try {@b@ LiveTServerSet.TServerConnection connection = this.tserverSet.getConnection(server);@b@ if (connection != null)@b@ connection.halt(this.masterLock);@b@ } catch (TTransportException e) {@b@ }@b@ catch (Exception e) {@b@ log.info("error talking to troublesome tablet server ", e);@b@ }@b@ this.badServers.remove(server);@b@ this.tserverSet.remove(server);@b@ }@b@ }@b@ }@b@ synchronized (this.badServers) {@b@ this.badServers.keySet().retainAll(currentServers);@b@ this.badServers.keySet().removeAll(result.keySet());@b@ }@b@ log.debug(java.lang.String.format("Finished gathering information from %d servers in %.2f seconds", new Object[] { Integer.valueOf(result.size()), Double.valueOf((System.currentTimeMillis() - start) / 1000.0D) }));@b@ return result;@b@ }@b@@b@ public void run() throws IOException, InterruptedException, KeeperException {@b@ java.lang.String zroot = ZooUtil.getRoot(this.instance);@b@@b@ getMasterLock(zroot + "/masters/lock");@b@@b@ this.recoveryManager = new RecoveryManager(this);@b@@b@ TableManager.getInstance().addObserver(this);@b@@b@ StatusThread statusThread = new StatusThread(this, null);@b@ statusThread.start();@b@@b@ MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread(this, null);@b@ migrationCleanupThread.start();@b@@b@ this.tserverSet.startListeningForTabletServerChanges();@b@@b@ ZooReaderWriter.getInstance().getChildren(zroot + "/recovery", new Watcher(this, zroot)@b@ {@b@ public void process() {@b@ Master.access$100(this.this$0).event("Noticed recovery changes", new Object[] { event.getType() });@b@ try@b@ {@b@ ZooReaderWriter.getInstance().getChildren(this.val$zroot + "/recovery", this);@b@ } catch (Exception e) {@b@ Master.access$200().error("Failed to add log recovery watcher back", e);@b@ }@b@@b@ }@b@@b@ });@b@ TCredentials systemAuths = SecurityConstants.getSystemCredentials();@b@ TabletStateStore[] stores = { new ZooTabletStateStore(new org.apache.accumulo.server.master.state.ZooStore(zroot)), new RootTabletStateStore(this.instance, systemAuths, this), new MetaDataStateStore(this.instance, systemAuths, this) };@b@@b@ this.watchers.add(new TabletGroupWatcher(this, stores[2], null));@b@ this.watchers.add(new TabletGroupWatcher(this, stores[1], (TabletGroupWatcher)this.watchers.get(0)));@b@ this.watchers.add(new TabletGroupWatcher(this, stores[0], (TabletGroupWatcher)this.watchers.get(1)));@b@ for (TabletGroupWatcher watcher : this.watchers) {@b@ watcher.start();@b@ }@b@@b@ this.waitForMetadataUpgrade.await();@b@ try@b@ {@b@ AgeOffStore store = new AgeOffStore(new org.apache.accumulo.fate.ZooStore(ZooUtil.getRoot(this.instance) + "/fate", ZooReaderWriter.getRetryingInstance()), 28800000L);@b@@b@ int threads = getConfiguration().getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);@b@@b@ this.fate = new Fate(this, store, threads);@b@@b@ SimpleTimer.getInstance().schedule(new Runnable(this, store)@b@ {@b@ public void run()@b@ {@b@ this.val$store.ageOff();@b@ }@b@ }@b@ , 63000L, 63000L);@b@ }@b@ catch (KeeperException e)@b@ {@b@ throw new IOException(e);@b@ } catch (InterruptedException e) {@b@ throw new IOException(e);@b@ }@b@@b@ MasterClientService.Processor processor = new MasterClientService.Processor((MasterClientService.Iface)TraceWrap.service(new MasterClientServiceHandler(this, null)));@b@ TServerUtils.ServerPort serverPort = TServerUtils.startServer(getSystemConfiguration(), Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);@b@@b@ this.clientService = serverPort.server;@b@ InetSocketAddress sock = AddressUtil.parseAddress(this.hostname, serverPort.port);@b@ java.lang.String address = AddressUtil.toString(sock);@b@ log.info("Setting master lock data to " + address);@b@ this.masterLock.replaceLockData(address.getBytes(Constants.UTF8));@b@@b@ while (!(this.clientService.isServing()))@b@ UtilWaitThread.sleep(100L);@b@@b@ while (this.clientService.isServing()) {@b@ UtilWaitThread.sleep(500L);@b@ }@b@@b@ long deadline = System.currentTimeMillis() + 1000L;@b@ statusThread.join(remaining(deadline));@b@@b@ for (TabletGroupWatcher watcher : this.watchers)@b@ watcher.join(remaining(deadline));@b@@b@ log.info("exiting");@b@ }@b@@b@ private long remaining(long deadline) {@b@ return Math.max(1L, deadline - System.currentTimeMillis());@b@ }@b@@b@ public ZooLock getMasterLock() {@b@ return this.masterLock;@b@ }@b@@b@ private void getMasterLock(java.lang.String zMasterLoc)@b@ throws KeeperException, InterruptedException@b@ {@b@ log.info("trying to get master lock");@b@@b@ java.lang.String masterClientAddress = AddressUtil.toString(new InetSocketAddress(this.hostname, getSystemConfiguration().getPort(Property.MASTER_CLIENTPORT)));@b@ while (true)@b@ {@b@ MasterLockWatcher masterLockWatcher = new MasterLockWatcher(null);@b@ this.masterLock = new ZooLock(zMasterLoc);@b@ this.masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes(Constants.UTF8));@b@@b@ masterLockWatcher.waitForChange();@b@@b@ if (masterLockWatcher.acquiredLock) {@b@ break;@b@ }@b@@b@ if (!(masterLockWatcher.failedToAcquireLock)) {@b@ throw new IllegalStateException("master lock in unknown state");@b@ }@b@@b@ this.masterLock.tryToCancelAsyncLockOrUnlock();@b@@b@ UtilWaitThread.sleep(1000L);@b@ }@b@@b@ setMasterState(MasterState.HAVE_LOCK);@b@ }@b@@b@ public static void main(java.lang.String[] args) throws Exception {@b@ try {@b@ SecurityUtil.serverLogin();@b@@b@ FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());@b@ java.lang.String hostname = Accumulo.getLocalAddress(args);@b@ Instance instance = HdfsZooInstance.getInstance();@b@ ServerConfiguration conf = new ServerConfiguration(instance);@b@ Accumulo.init(fs, conf, "master");@b@ Master master = new Master(conf, fs, hostname);@b@ Accumulo.enableTracing(hostname, "master");@b@ master.run();@b@ } catch (Exception ex) {@b@ log.error("Unexpected exception, exiting", ex);@b@ System.exit(1);@b@ }@b@ }@b@@b@ public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added)@b@ {@b@ DeadServerList obit = new DeadServerList(ZooUtil.getRoot(this.instance) + "/dead/tservers");@b@ if (added.size() > 0) {@b@ log.info("New servers: " + added);@b@ for (TServerInstance up : added)@b@ obit.delete(up.hostPort());@b@ }@b@ for (??? = deleted.iterator(); ???.hasNext(); ) { ??? = (TServerInstance)???.next();@b@ java.lang.String cause = "unexpected failure";@b@ if (this.serversToShutdown.contains(???))@b@ cause = "clean shutdown";@b@ if (!(getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)))@b@ obit.post(???.hostPort(), cause);@b@ }@b@@b@ Set unexpected = new HashSet(deleted);@b@ unexpected.removeAll(this.serversToShutdown);@b@ if ((unexpected.size() > 0) && @b@ (stillMaster()) && (!(getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)))) {@b@ log.warn("Lost servers " + unexpected);@b@ }@b@@b@ this.serversToShutdown.removeAll(deleted);@b@ this.badServers.keySet().removeAll(deleted);@b@@b@ synchronized (this.badServers) {@b@ cleanListByHostAndPort(this.badServers.keySet(), deleted, added);@b@ }@b@ synchronized (this.serversToShutdown) {@b@ cleanListByHostAndPort(this.serversToShutdown, deleted, added);@b@ }@b@@b@ synchronized (this.migrations) {@b@ Iterator iter = this.migrations.entrySet().iterator();@b@ while (iter.hasNext()) {@b@ Map.Entry entry = (Map.Entry)iter.next();@b@ if (deleted.contains(entry.getValue())) {@b@ log.info("Canceling migration of " + entry.getKey() + " to " + entry.getValue());@b@ iter.remove();@b@ }@b@ }@b@ }@b@ this.nextEvent.event("There are now %d tablet servers", new Object[] { Integer.valueOf(current.size()) });@b@ }@b@@b@ private static void cleanListByHostAndPort(Collection<TServerInstance> badServers, Set<TServerInstance> deleted, Set<TServerInstance> added) {@b@ TServerInstance bad;@b@ Iterator badIter = badServers.iterator();@b@ while (badIter.hasNext()) {@b@ bad = (TServerInstance)badIter.next();@b@ for (TServerInstance add : added)@b@ if (bad.hostPort().equals(add.hostPort())) {@b@ badIter.remove();@b@ break;@b@ }@b@@b@ for (TServerInstance del : deleted)@b@ if (bad.hostPort().equals(del.hostPort())) {@b@ badIter.remove();@b@ break;@b@ }@b@ }@b@ }@b@@b@ public void stateChanged(java.lang.String tableId, TableState state)@b@ {@b@ this.nextEvent.event("Table state in zookeeper changed for %s to %s", new Object[] { tableId, state });@b@ if (TableState.OFFLINE == state)@b@ clearMigrations(tableId);@b@ }@b@@b@ public void initialize(Map<java.lang.String, TableState> tableIdToStateMap)@b@ {@b@ }@b@@b@ public void sessionExpired()@b@ {@b@ }@b@@b@ public Set<java.lang.String> onlineTables() {@b@ Set result = new HashSet();@b@ if (getMasterState() != MasterState.NORMAL) {@b@ if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS)@b@ result.add("!0");@b@ return result;@b@ }@b@ TableManager manager = TableManager.getInstance();@b@@b@ for (java.lang.String tableId : Tables.getIdToNameMap(this.instance).keySet()) {@b@ TableState state = manager.getTableState(tableId);@b@ if ((state != null) && @b@ (state == TableState.ONLINE))@b@ result.add(tableId);@b@ }@b@@b@ return result;@b@ }@b@@b@ public Set<TServerInstance> onlineTabletServers()@b@ {@b@ return this.tserverSet.getCurrentServers();@b@ }@b@@b@ public Collection<MergeInfo> merges()@b@ {@b@ List result = new ArrayList();@b@ for (java.lang.String tableId : Tables.getIdToNameMap(this.instance).keySet())@b@ result.add(getMergeInfo(new Text(tableId)));@b@@b@ return result;@b@ }@b@@b@ public void killTServer(TServerInstance server) {@b@ this.nextEvent.event("Forcing server down %s", new Object[] { server });@b@ this.serversToShutdown.add(server);@b@ }@b@@b@ public void shutdownTServer(TServerInstance server)@b@ {@b@ this.nextEvent.event("Tablet Server shutdown requested for %s", new Object[] { server });@b@ this.serversToShutdown.add(server);@b@ }@b@@b@ public EventCoordinator getEventCoordinator() {@b@ return this.nextEvent;@b@ }@b@@b@ public Instance getInstance() {@b@ return this.instance;@b@ }@b@@b@ public AccumuloConfiguration getSystemConfiguration() {@b@ return this.serverConfig.getConfiguration();@b@ }@b@@b@ public ServerConfiguration getConfiguration() {@b@ return this.serverConfig;@b@ }@b@@b@ public FileSystem getFileSystem() {@b@ return this.fs;@b@ }@b@@b@ public void updateRecoveryInProgress(java.lang.String file) {@b@ this.recoveriesInProgress.add(file);@b@ }@b@@b@ private static class MasterLockWatcher@b@ implements ZooLock.AsyncLockWatcher@b@ {@b@ boolean acquiredLock;@b@ boolean failedToAcquireLock;@b@@b@ private MasterLockWatcher()@b@ {@b@ this.acquiredLock = false;@b@ this.failedToAcquireLock = false;@b@ }@b@@b@ public void lostLock(ZooLock.LockLossReason reason) {@b@ Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1);@b@ }@b@@b@ public void unableToMonitorLockNode(Throwable e)@b@ {@b@ Halt.halt(-1, new Runnable(this, e)@b@ {@b@ public void run() {@b@ Master.access$200().fatal("No longer able to monitor master lock node", this.val$e);@b@ }@b@ });@b@ }@b@@b@ public synchronized void acquiredLock()@b@ {@b@ Master.access$200().debug("Acquired master lock");@b@@b@ if ((this.acquiredLock) || (this.failedToAcquireLock)) {@b@ Halt.halt("Zoolock in unexpected state AL " + this.acquiredLock + " " + this.failedToAcquireLock, -1);@b@ }@b@@b@ this.acquiredLock = true;@b@ super.notifyAll();@b@ }@b@@b@ public synchronized void failedToAcquireLock(Exception e)@b@ {@b@ Master.access$200().warn("Failed to get master lock " + e);@b@@b@ if (this.acquiredLock) {@b@ Halt.halt("Zoolock in unexpected state FAL " + this.acquiredLock + " " + this.failedToAcquireLock, -1);@b@ }@b@@b@ this.failedToAcquireLock = true;@b@ super.notifyAll();@b@ }@b@@b@ public synchronized void waitForChange() {@b@ if ((!(this.acquiredLock)) && (!(this.failedToAcquireLock)))@b@ try {@b@ super.wait();@b@ }@b@ catch (InterruptedException e)@b@ {@b@ }@b@ }@b@ }@b@@b@ private class StatusThread extends Daemon@b@ {@b@ public void run()@b@ {@b@ setName("Status Thread");@b@ EventCoordinator.Listener eventListener = Master.access$100(this.this$0).getListener();@b@ while (this.this$0.stillMaster()) {@b@ int count = 0;@b@ long wait = 10000L;@b@ try {@b@ switch (Master.5.$SwitchMap$org$apache$accumulo$core$master$thrift$MasterGoalState[this.this$0.getMasterGoalState().ordinal()])@b@ {@b@ case 1:@b@ Master.access$2300(this.this$0, MasterState.NORMAL);@b@ break;@b@ case 2:@b@ if (Master.access$1100(this.this$0) == MasterState.NORMAL)@b@ Master.access$2300(this.this$0, MasterState.SAFE_MODE);@b@@b@ if (Master.access$1100(this.this$0) == MasterState.HAVE_LOCK)@b@ Master.access$2300(this.this$0, MasterState.SAFE_MODE); break;@b@ case 3:@b@ switch (Master.5.$SwitchMap$org$apache$accumulo$core$master$thrift$MasterState[Master.access$1100(this.this$0).ordinal()])@b@ {@b@ case 1:@b@ Master.access$2300(this.this$0, MasterState.SAFE_MODE);@b@ break;@b@ case 2:@b@ count = Master.access$2400(this.this$0);@b@ Master.access$200().debug(java.lang.String.format("There are %d non-metadata tablets assigned or hosted", new Object[] { Integer.valueOf(count) }));@b@ if (count == 0)@b@ Master.access$2300(this.this$0, MasterState.UNLOAD_METADATA_TABLETS); break;@b@ case 3:@b@ count = Master.access$2600(this.this$0, Master.access$2500());@b@ Master.access$200().debug(java.lang.String.format("There are %d metadata tablets assigned or hosted", new Object[] { Integer.valueOf(count) }));@b@@b@ if (count == 1)@b@ Master.access$2300(this.this$0, MasterState.UNLOAD_ROOT_TABLET); break;@b@ case 4:@b@ count = Master.access$2600(this.this$0, Master.access$2500());@b@ if (count > 0)@b@ Master.access$200().debug(java.lang.String.format("The root tablet is still assigned or hosted", new Object[0]));@b@ if (count == 0) {@b@ Set currentServers = Master.access$700(this.this$0).getCurrentServers();@b@ Master.access$200().debug("stopping " + currentServers.size() + " tablet servers");@b@ for (TServerInstance server : currentServers)@b@ try {@b@ Master.access$1300(this.this$0).add(server);@b@ Master.access$700(this.this$0).getConnection(server).fastHalt(Master.access$800(this.this$0));@b@ } catch (TException e) {@b@ }@b@ finally {@b@ Master.access$700(this.this$0).remove(server);@b@ }@b@@b@ if (currentServers.size() == 0)@b@ Master.access$2300(this.this$0, MasterState.STOP);@b@ }@b@@b@ }@b@@b@ }@b@@b@ wait = updateStatus();@b@ eventListener.waitForEvents(wait);@b@ } catch (Throwable t) {@b@ Master.access$200().error("Error balancing tablets", t);@b@ UtilWaitThread.sleep(1000L);@b@ }@b@ }@b@ }@b@@b@ private long updateStatus() throws AccumuloException, AccumuloSecurityException, TableNotFoundException {@b@ Master.access$902(this.this$0, Collections.synchronizedSortedMap(Master.access$2700(this.this$0)));@b@ checkForHeldServer(Master.access$900(this.this$0));@b@@b@ if (!(Master.access$1000(this.this$0).isEmpty()))@b@ Master.access$200().debug("not balancing because the balance information is out-of-date " + Master.access$1000(this.this$0).keySet());@b@ else if (Master.access$2800(this.this$0) > 0)@b@ Master.access$200().debug("not balancing because there are unhosted tablets");@b@ else if (this.this$0.getMasterGoalState() == MasterGoalState.CLEAN_STOP)@b@ Master.access$200().debug("not balancing because the master is attempting to stop cleanly");@b@ else if (!(Master.access$1300(this.this$0).isEmpty()))@b@ Master.access$200().debug("not balancing while shutting down servers " + Master.access$1300(this.this$0));@b@ else@b@ return balanceTablets();@b@@b@ return 10000L;@b@ }@b@@b@ private void checkForHeldServer() {@b@ TServerInstance instance = null;@b@ int crazyHoldTime = 0;@b@ int someHoldTime = 0;@b@ long maxWait = this.this$0.getSystemConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE);@b@ for (Map.Entry entry : tserverStatus.entrySet())@b@ if (((TabletServerStatus)entry.getValue()).getHoldTime() > 0L) {@b@ ++someHoldTime;@b@ if (((TabletServerStatus)entry.getValue()).getHoldTime() > maxWait) {@b@ instance = (TServerInstance)entry.getKey();@b@ ++crazyHoldTime;@b@ }@b@ }@b@@b@ if ((crazyHoldTime == 1) && (someHoldTime == 1) && (tserverStatus.size() > 1)) {@b@ Master.access$200().warn("Tablet server " + instance + " exceeded maximum hold time: attempting to kill it");@b@ try {@b@ LiveTServerSet.TServerConnection connection = Master.access$700(this.this$0).getConnection(instance);@b@ if (connection != null)@b@ connection.fastHalt(Master.access$800(this.this$0));@b@ } catch (TException e) {@b@ Master.access$200().error(e, e);@b@ }@b@ Master.access$700(this.this$0).remove(instance);@b@ }@b@ }@b@@b@ private long balanceTablets() {@b@ List migrationsOut = new ArrayList();@b@ Set migrationsCopy = new HashSet();@b@ synchronized (Master.access$1600(this.this$0)) {@b@ migrationsCopy.addAll(Master.access$1600(this.this$0).keySet());@b@ }@b@ long wait = Master.access$1900(this.this$0).balance(Collections.unmodifiableSortedMap(Master.access$900(this.this$0)), Collections.unmodifiableSet(migrationsCopy), migrationsOut);@b@@b@ Iterator i$ = TabletBalancer.checkMigrationSanity(Master.access$900(this.this$0).keySet(), migrationsOut).iterator();@b@ while (true) { TabletMigration m;@b@ while (true) { if (!(i$.hasNext())) break label229; m = (TabletMigration)i$.next();@b@ if (!(Master.access$1600(this.this$0).containsKey(m.tablet))) break;@b@ Master.access$200().warn("balancer requested migration more than once, skipping " + m);@b@ }@b@@b@ Master.access$1600(this.this$0).put(m.tablet, m.newServer);@b@ Master.access$200().debug("migration " + m);@b@ }@b@ if (migrationsOut.size() > 0)@b@ label229: Master.access$100(this.this$0).event("Migrating %d more tablets, %d total", new Object[] { Integer.valueOf(migrationsOut.size()), Integer.valueOf(Master.access$1600(this.this$0).size()) });@b@@b@ return wait;@b@ }@b@ }@b@@b@ private class MigrationCleanupThread extends Daemon@b@ {@b@ public void run()@b@ {@b@ setName("Migration Cleanup Thread");@b@ while (this.this$0.stillMaster()) {@b@ if (!(Master.access$1600(this.this$0).isEmpty()))@b@ try {@b@ cleanupOfflineMigrations();@b@ cleanupNonexistentMigrations(this.this$0.getConnector());@b@ } catch (Exception ex) {@b@ Master.access$200().error("Error cleaning up migrations", ex);@b@ }@b@@b@ UtilWaitThread.sleep(300000L);@b@ }@b@ }@b@@b@ private void cleanupNonexistentMigrations()@b@ throws AccumuloException, AccumuloSecurityException, TableNotFoundException@b@ {@b@ Scanner scanner = connector.createScanner("!METADATA", Constants.NO_AUTHS);@b@ Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);@b@ Set found = new HashSet();@b@ for (Map.Entry entry : scanner) {@b@ KeyExtent extent = new KeyExtent(((Key)entry.getKey()).getRow(), (Value)entry.getValue());@b@ if (Master.access$1600(this.this$0).containsKey(extent))@b@ found.add(extent);@b@ }@b@@b@ Master.access$1600(this.this$0).keySet().retainAll(found);@b@ }@b@@b@ private void cleanupOfflineMigrations()@b@ {@b@ TableManager manager = TableManager.getInstance();@b@ for (java.lang.String tableId : Tables.getIdToNameMap(Master.access$300(this.this$0)).keySet()) {@b@ TableState state = manager.getTableState(tableId);@b@ if (TableState.OFFLINE == state)@b@ this.this$0.clearMigrations(tableId);@b@ }@b@ }@b@ }@b@@b@ private class TabletGroupWatcher extends Daemon@b@ {@b@ final TabletStateStore store;@b@ final TabletGroupWatcher dependentWatcher;@b@ final TableStats stats = new TableStats();@b@@b@ TabletGroupWatcher(, TabletStateStore paramTabletStateStore, TabletGroupWatcher paramTabletGroupWatcher)@b@ {@b@ this.store = paramTabletStateStore;@b@ this.dependentWatcher = paramTabletGroupWatcher;@b@ }@b@@b@ Map<Text, TableCounts> getStats() {@b@ return this.stats.getLast();@b@ }@b@@b@ TableCounts getStats() {@b@ return this.stats.getLast(tableId);@b@ }@b@@b@ public void run()@b@ {@b@ Thread.currentThread().setName("Watching " + this.store.name());@b@ int[] oldCounts = new int[TabletState.values().length];@b@ EventCoordinator.Listener eventListener = Master.access$100(this.this$0).getListener();@b@@b@ while (this.this$0.stillMaster()) {@b@ int totalUnloaded = 0;@b@ int unloaded = 0;@b@ try { Map mergeStatsCache;@b@ Map currentMerges;@b@ SortedMap currentTServers;@b@ while (true) { mergeStatsCache = new HashMap();@b@ currentMerges = new HashMap();@b@ for (MergeInfo merge : this.this$0.merges()) {@b@ if (merge.getRange() != null) {@b@ currentMerges.put(merge.getRange().getTableId(), new MergeStats(merge));@b@ }@b@@b@ }@b@@b@ currentTServers = new TreeMap();@b@ for (TServerInstance entry : Master.access$700(this.this$0).getCurrentServers()) {@b@ currentTServers.put(entry, Master.access$900(this.this$0).get(entry));@b@ }@b@@b@ if (currentTServers.size() != 0) break;@b@ eventListener.waitForEvents(60000L);@b@ }@b@@b@ SortedMap destinations = new TreeMap(currentTServers);@b@ destinations.keySet().removeAll(Master.access$1300(this.this$0));@b@@b@ List assignments = new ArrayList();@b@ List assigned = new ArrayList();@b@ List assignedToDeadServers = new ArrayList();@b@ Map unassigned = new HashMap();@b@@b@ int[] counts = new int[TabletState.values().length];@b@ this.stats.begin();@b@@b@ Iterator i$ = this.store.iterator();@b@ while (true) { TabletLocationState tls;@b@ Master.TabletGoalState goal;@b@ TServerInstance server;@b@ TabletState state;@b@ while (true) { while (true) { while (true) { if (!(i$.hasNext())) break label1230; tls = (TabletLocationState)i$.next();@b@ if (tls != null)@b@ break;@b@ }@b@@b@ if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) != null)@b@ break;@b@ }@b@@b@ if (unassigned.size() + unloaded > 5000 * currentTServers.size()) {@b@ flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);@b@ assignments.clear();@b@ assigned.clear();@b@ assignedToDeadServers.clear();@b@ unassigned.clear();@b@ unloaded = 0;@b@ eventListener.waitForEvents(60000L);@b@ }@b@ Text tableId = tls.extent.getTableId();@b@ MergeStats mergeStats = (MergeStats)mergeStatsCache.get(tableId);@b@ if (mergeStats == null) {@b@ mergeStats = (MergeStats)currentMerges.get(tableId);@b@ if (mergeStats == null)@b@ mergeStats = new MergeStats(new MergeInfo());@b@@b@ mergeStatsCache.put(tableId, mergeStats);@b@ }@b@ goal = this.this$0.getGoalState(tls, mergeStats.getMergeInfo());@b@ server = tls.getServer();@b@ state = tls.getState(currentTServers.keySet());@b@ this.stats.update(tableId, state);@b@ mergeStats.update(tls.extent, state, tls.chopped, !(tls.walogs.isEmpty()));@b@ sendChopRequest(mergeStats.getMergeInfo(), state, tls);@b@ sendSplitRequest(mergeStats.getMergeInfo(), state, tls);@b@@b@ if (state == TabletState.ASSIGNED) {@b@ goal = Master.TabletGoalState.HOSTED;@b@ }@b@@b@ if ((goal == Master.TabletGoalState.UNASSIGNED) && (state == TabletState.HOSTED) && @b@ (Master.access$1300(this.this$0).equals(currentTServers.keySet())) && @b@ (this.dependentWatcher != null) && (this.dependentWatcher.assignedOrHosted() > 0)) {@b@ goal = Master.TabletGoalState.HOSTED;@b@ }@b@@b@ if (goal != Master.TabletGoalState.HOSTED) break label1073;@b@ if ((state == TabletState.HOSTED) || (tls.walogs.isEmpty()) || @b@ (!(Master.access$2200(this.this$0).recoverLogs(tls.extent, tls.walogs))))@b@ break;@b@ }@b@ switch (Master.5.$SwitchMap$org$apache$accumulo$server$master$state$TabletState[state.ordinal()])@b@ {@b@ case 1:@b@ if (server.equals(Master.access$1600(this.this$0).get(tls.extent)))@b@ Master.access$1600(this.this$0).remove(tls.extent); break;@b@ case 2:@b@ assignedToDeadServers.add(tls);@b@ if (server.equals(Master.access$1600(this.this$0).get(tls.extent)))@b@ Master.access$1600(this.this$0).remove(tls.extent); break;@b@ case 3:@b@ TServerInstance dest = (TServerInstance)Master.access$1600(this.this$0).get(tls.extent);@b@ if (dest != null)@b@ {@b@ if (destinations.keySet().contains(dest)) {@b@ assignments.add(new Assignment(tls.extent, dest));@b@ }@b@ else {@b@ Master.access$1600(this.this$0).remove(tls.extent);@b@ unassigned.put(tls.extent, server);@b@ }@b@ }@b@ else unassigned.put(tls.extent, server);@b@@b@ break;@b@ case 4:@b@ assigned.add(new Assignment(tls.extent, tls.future));@b@ }@b@@b@ switch (Master.5.$SwitchMap$org$apache$accumulo$server$master$state$TabletState[state.ordinal()])@b@ {@b@ case 3:@b@ break;@b@ case 2:@b@ assignedToDeadServers.add(tls);@b@@b@ break;@b@ case 1:@b@ label1073: LiveTServerSet.TServerConnection conn = Master.access$700(this.this$0).getConnection(server);@b@ if (conn != null) {@b@ conn.unloadTablet(Master.access$800(this.this$0), tls.extent, goal != Master.TabletGoalState.DELETED);@b@ ++unloaded;@b@ ++totalUnloaded;@b@ } else {@b@ Master.access$200().warn("Could not connect to server " + server);@b@ }@b@@b@ case 4:@b@ }@b@@b@ counts[state.ordinal()] += 1;@b@ }@b@@b@ label1230: flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned);@b@@b@ this.stats.end();@b@@b@ TabletState[] arr$ = TabletState.values(); int len$ = arr$.length; for (int i$ = 0; i$ < len$; ++i$) { TabletState state = arr$[i$];@b@ int i = state.ordinal();@b@ if ((counts[i] > 0) && (counts[i] != oldCounts[i]))@b@ Master.access$100(this.this$0).event("[%s]: %d tablets are %s", new Object[] { this.store.name(), Integer.valueOf(counts[i]), state.name() });@b@ }@b@@b@ Master.access$200().debug(java.lang.String.format("[%s]: scan time %.2f seconds", new Object[] { this.store.name(), Double.valueOf(this.stats.getScanTime() / 1000.0D) }));@b@ oldCounts = counts;@b@ if (totalUnloaded > 0) {@b@ Master.access$100(this.this$0).event("[%s]: %d tablets unloaded", new Object[] { this.store.name(), Integer.valueOf(totalUnloaded) });@b@ }@b@@b@ updateMergeState(mergeStatsCache);@b@@b@ Master.access$200().debug(java.lang.String.format("[%s] sleeping for %.2f seconds", new Object[] { this.store.name(), Double.valueOf(60.0D) }));@b@ eventListener.waitForEvents(60000L);@b@ } catch (Exception ex) {@b@ Master.access$200().error("Error processing table state for store " + this.store.name(), ex);@b@ if ((ex.getCause() != null) && (ex.getCause() instanceof TabletLocationState.BadLocationStateException))@b@ repairMetadata(((TabletLocationState.BadLocationStateException)ex.getCause()).getEncodedEndRow());@b@ else@b@ UtilWaitThread.sleep(1000L);@b@ }@b@ }@b@ }@b@@b@ private void repairMetadata()@b@ {@b@ Master.access$200().debug("Attempting repair on " + row);@b@ try@b@ {@b@ Map future = new HashMap();@b@ Map assigned = new HashMap();@b@ Scanner scanner = this.this$0.getConnector().createScanner("!METADATA", Constants.NO_AUTHS);@b@ scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);@b@ scanner.fetchColumnFamily(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY);@b@ scanner.setRange(new Range(row));@b@ for (Map.Entry entry : scanner)@b@ if (((Key)entry.getKey()).getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY))@b@ assigned.put(entry.getKey(), entry.getValue());@b@ else if (((Key)entry.getKey()).getColumnFamily().equals(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY))@b@ future.put(entry.getKey(), entry.getValue());@b@@b@@b@ if ((future.size() > 0) && (assigned.size() > 0)) {@b@ Master.access$200().warn("Found a tablet assigned and hosted, attempting to repair");@b@ } else if ((future.size() > 1) && (assigned.size() == 0)) {@b@ Master.access$200().warn("Found a tablet assigned to multiple servers, attempting to repair");@b@ } else if ((future.size() == 0) && (assigned.size() > 1)) {@b@ Master.access$200().warn("Found a tablet hosted on multiple servers, attempting to repair");@b@ } else {@b@ Master.access$200().info("Attempted a repair, but nothing seems to be obviously wrong. " + assigned + " " + future);@b@ return;@b@ }@b@ Map all = new HashMap();@b@ all.putAll(future);@b@ all.putAll(assigned);@b@ for (Map.Entry entry : all.entrySet()) {@b@ TServerInstance alive = Master.access$700(this.this$0).find(((Value)entry.getValue()).toString());@b@ if (alive == null) {@b@ Master.access$200().info("Removing entry " + entry);@b@ BatchWriter bw = this.this$0.getConnector().createBatchWriter("!METADATA", new BatchWriterConfig());@b@ Mutation m = new Mutation(((Key)entry.getKey()).getRow());@b@ m.putDelete(((Key)entry.getKey()).getColumnFamily(), ((Key)entry.getKey()).getColumnQualifier());@b@ bw.addMutation(m);@b@ bw.close();@b@ return;@b@ }@b@ }@b@ Master.access$200().error("Metadata table is inconsistent at " + row + " and all assigned/future tservers are still online.");@b@ } catch (Throwable e) {@b@ Master.access$200().error("Error attempting repair of metadata " + row + ": " + e, e);@b@ }@b@ }@b@@b@ private int assignedOrHosted() {@b@ int result = 0;@b@ for (TableCounts counts : this.stats.getLast().values())@b@ result += counts.assigned() + counts.hosted();@b@@b@ return result;@b@ }@b@@b@ private void sendSplitRequest(, TabletState state, TabletLocationState tls)@b@ {@b@ Text[] arr$;@b@ int i$;@b@ if (!(info.getState().equals(MergeState.SPLITTING)))@b@ return;@b@@b@ if (!(info.isDelete()))@b@ return;@b@@b@ if (!(state.equals(TabletState.HOSTED)))@b@ return;@b@@b@ KeyExtent range = info.getRange();@b@ if (tls.extent.overlaps(range)) {@b@ arr$ = new Text[] { range.getPrevEndRow(), range.getEndRow() }; int len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { Text splitPoint = arr$[i$];@b@ if (splitPoint == null)@b@ break label336:@b@ if (!(tls.extent.contains(splitPoint)))@b@ break label336:@b@ if (splitPoint.equals(tls.extent.getEndRow()))@b@ break label336:@b@ if (splitPoint.equals(tls.extent.getPrevEndRow()))@b@ break label336:@b@ try@b@ {@b@ LiveTServerSet.TServerConnection conn = Master.access$700(this.this$0).getConnection(tls.current);@b@ if (conn != null) {@b@ Master.access$200().info("Asking " + tls.current + " to split " + tls.extent + " at " + splitPoint);@b@ conn.splitTablet(Master.access$800(this.this$0), tls.extent, splitPoint);@b@ } else {@b@ Master.access$200().warn("Not connected to server " + tls.current);@b@ }@b@ } catch (org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException e) {@b@ Master.access$200().debug("Error asking tablet server to split a tablet: " + e);@b@ } catch (Exception e) {@b@ Master.access$200().warn("Error asking tablet server to split a tablet: " + e);@b@ }@b@ }@b@ }@b@ label336:@b@ }@b@@b@ private void sendChopRequest(, TabletState state, TabletLocationState tls) {@b@ if (!(info.getState().equals(MergeState.WAITING_FOR_CHOPPED)))@b@ return;@b@@b@ if (!(state.equals(TabletState.HOSTED)))@b@ return;@b@@b@ if (tls.chopped)@b@ return;@b@@b@ if (info.needsToBeChopped(tls.extent))@b@ try@b@ {@b@ LiveTServerSet.TServerConnection conn = Master.access$700(this.this$0).getConnection(tls.current);@b@ if (conn != null) {@b@ Master.access$200().info("Asking " + tls.current + " to chop " + tls.extent);@b@ conn.chop(Master.access$800(this.this$0), tls.extent);@b@ } else {@b@ Master.access$200().warn("Could not connect to server " + tls.current);@b@ }@b@ } catch (TException e) {@b@ Master.access$200().warn("Communications error asking tablet server to chop a tablet");@b@ }@b@ }@b@@b@ private void updateMergeState()@b@ {@b@ for (MergeStats stats : mergeStatsCache.values())@b@ try {@b@ MergeState update = stats.nextMergeState(this.this$0.getConnector(), this.this$0);@b@@b@ if (update == MergeState.COMPLETE)@b@ update = MergeState.NONE;@b@ if (update != stats.getMergeInfo().getState()) {@b@ this.this$0.setMergeState(stats.getMergeInfo(), update);@b@ }@b@@b@ if (update == MergeState.MERGING)@b@ try {@b@ if (stats.getMergeInfo().isDelete())@b@ deleteTablets(stats.getMergeInfo());@b@ else@b@ mergeMetadataRecords(stats.getMergeInfo());@b@@b@ this.this$0.setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);@b@ } catch (Exception ex) {@b@ Master.access$200().error("Unable merge metadata table records", ex);@b@ }@b@ }@b@ catch (Exception ex) {@b@ Master.access$200().error("Unable to update merge state for merge " + stats.getMergeInfo().getRange(), ex);@b@ }@b@ }@b@@b@ private void deleteTablets() throws AccumuloException@b@ {@b@ KeyExtent range = info.getRange();@b@ Master.access$200().debug("Deleting tablets for " + range);@b@ char timeType = ';@b@ KeyExtent followingTablet = null;@b@ if (range.getEndRow() != null) {@b@ Key nextExtent = new Key(range.getEndRow()).followingKey(PartialKey.ROW);@b@ followingTablet = getHighTablet(new KeyExtent(range.getTableId(), nextExtent.getRow(), range.getEndRow()));@b@ Master.access$200().debug("Found following tablet " + followingTablet);@b@ }@b@ try {@b@ Connector conn = this.this$0.getConnector();@b@ Text start = range.getPrevEndRow();@b@ if (start == null)@b@ start = new Text();@b@@b@ Master.access$200().debug("Making file deletion entries for " + range);@b@ Range deleteRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, KeyExtent.getMetadataEntry(range.getTableId(), range.getEndRow()), true);@b@@b@ Scanner scanner = conn.createScanner("!METADATA", Constants.NO_AUTHS);@b@ scanner.setRange(deleteRange);@b@ Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);@b@ Constants.METADATA_TIME_COLUMN.fetch(scanner);@b@ scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);@b@ scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);@b@ Set datafiles = new TreeSet();@b@ for (Map.Entry entry : scanner) {@b@ Key key = (Key)entry.getKey();@b@ if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {@b@ datafiles.add(key.getColumnQualifier().toString());@b@ if (datafiles.size() > 1000) {@b@ MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());@b@ datafiles.clear();@b@ }@b@ } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {@b@ timeType = ((Value)entry.getValue()).toString().charAt(0); } else {@b@ if (key.compareColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY) == 0)@b@ throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");@b@ if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {@b@ datafiles.add(((Value)entry.getValue()).toString());@b@ if (datafiles.size() > 1000) {@b@ MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());@b@ datafiles.clear(); }@b@ }@b@ }@b@ }@b@ MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());@b@ BatchWriter bw = conn.createBatchWriter("!METADATA", new BatchWriterConfig());@b@ try {@b@ deleteTablets(deleteRange, bw, conn);@b@ } finally {@b@ bw.close();@b@ }@b@@b@ if (followingTablet != null) {@b@ Master.access$200().debug("Updating prevRow of " + followingTablet + " to " + range.getPrevEndRow());@b@ bw = conn.createBatchWriter("!METADATA", new BatchWriterConfig());@b@ try {@b@ Mutation m = new Mutation(followingTablet.getMetadataEntry());@b@ Constants.METADATA_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(range.getPrevEndRow()));@b@ Constants.METADATA_CHOPPED_COLUMN.putDelete(m);@b@ bw.addMutation(m);@b@ bw.flush();@b@ } finally {@b@ bw.close();@b@ }@b@ }@b@ else {@b@ Master.access$200().debug("Recreating the last tablet to point to " + range.getPrevEndRow());@b@ MetadataTable.addTablet(new KeyExtent(range.getTableId(), null, range.getPrevEndRow()), "/default_tablet", SecurityConstants.getSystemCredentials(), timeType, Master.access$800(this.this$0));@b@ }@b@ }@b@ catch (Exception ex) {@b@ throw new AccumuloException(ex);@b@ }@b@ }@b@@b@ private void mergeMetadataRecords() throws AccumuloException {@b@ KeyExtent range = info.getRange();@b@ Master.access$200().debug("Merging metadata for " + range);@b@ KeyExtent stop = getHighTablet(range);@b@ Master.access$200().debug("Highest tablet is " + stop);@b@ Value firstPrevRowValue = null;@b@ Text stopRow = stop.getMetadataEntry();@b@ Text start = range.getPrevEndRow();@b@ if (start == null)@b@ start = new Text();@b@@b@ Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false);@b@ if (range.isMeta())@b@ scanRange = scanRange.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);@b@@b@ BatchWriter bw = null;@b@ try {@b@ long fileCount = 0L;@b@ Connector conn = this.this$0.getConnector();@b@@b@ bw = conn.createBatchWriter("!METADATA", new BatchWriterConfig());@b@ Scanner scanner = conn.createScanner("!METADATA", Constants.NO_AUTHS);@b@ scanner.setRange(scanRange);@b@ Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);@b@ Constants.METADATA_TIME_COLUMN.fetch(scanner);@b@ Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);@b@ scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);@b@ Mutation m = new Mutation(stopRow);@b@ java.lang.String maxLogicalTime = null;@b@ for (Map.Entry entry : scanner) {@b@ Key key = (Key)entry.getKey();@b@ Value value = (Value)entry.getValue();@b@ if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {@b@ m.put(key.getColumnFamily(), key.getColumnQualifier(), value);@b@ fileCount += 1L;@b@ } else if ((Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key)) && (firstPrevRowValue == null)) {@b@ Master.access$200().debug("prevRow entry for lowest tablet is " + value);@b@ firstPrevRowValue = new Value(value);@b@ } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {@b@ maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());@b@ } else if ((Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) && @b@ (!(range.isMeta()))) {@b@ bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), ((Value)entry.getValue()).toString()));@b@ }@b@@b@ }@b@@b@ scanner = conn.createScanner("!METADATA", Constants.NO_AUTHS);@b@ Range last = new Range(stopRow);@b@ if (range.isMeta())@b@ last = last.clip(Constants.METADATA_ROOT_TABLET_KEYSPACE);@b@ scanner.setRange(last);@b@ Constants.METADATA_TIME_COLUMN.fetch(scanner);@b@ for (Map.Entry entry : scanner) {@b@ if (Constants.METADATA_TIME_COLUMN.hasColumns((Key)entry.getKey()))@b@ maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, ((Value)entry.getValue()).toString());@b@@b@ }@b@@b@ if (maxLogicalTime != null)@b@ Constants.METADATA_TIME_COLUMN.put(m, new Value(maxLogicalTime.getBytes(Constants.UTF8)));@b@@b@ if (!(m.getUpdates().isEmpty())) {@b@ bw.addMutation(m);@b@ }@b@@b@ bw.flush();@b@@b@ Master.access$200().debug("Moved " + fileCount + " files to " + stop);@b@@b@ if (firstPrevRowValue == null) {@b@ Master.access$200().debug("tablet already merged");@b@@b@ return;@b@ }@b@ stop.setPrevEndRow(KeyExtent.decodePrevEndRow(firstPrevRowValue));@b@ Mutation updatePrevRow = stop.getPrevRowUpdateMutation();@b@ Master.access$200().debug("Setting the prevRow for last tablet: " + stop);@b@ bw.addMutation(updatePrevRow);@b@ bw.flush();@b@@b@ deleteTablets(scanRange, bw, conn);@b@@b@ m = new Mutation(stopRow);@b@ Constants.METADATA_CHOPPED_COLUMN.putDelete(m);@b@ bw.addMutation(m);@b@ try@b@ {@b@ bw.close();@b@ } catch (Exception ex) {@b@ throw new AccumuloException(ex);@b@ }@b@ }@b@ catch (Exception ex)@b@ {@b@ }@b@ finally@b@ {@b@ if (bw != null)@b@ try {@b@ bw.close();@b@ } catch (Exception ex) {@b@ throw new AccumuloException(ex);@b@ }@b@ }@b@ }@b@@b@ private void deleteTablets(, BatchWriter bw, Connector conn)@b@ throws TableNotFoundException, MutationsRejectedException@b@ {@b@ Scanner scanner = conn.createScanner("!METADATA", Constants.NO_AUTHS);@b@ Master.access$200().debug("Deleting range " + scanRange);@b@ scanner.setRange(scanRange);@b@ RowIterator rowIter = new RowIterator(scanner);@b@ while (rowIter.hasNext()) {@b@ Iterator row = rowIter.next();@b@ Mutation m = null;@b@ while (row.hasNext()) {@b@ Map.Entry entry = (Map.Entry)row.next();@b@ Key key = (Key)entry.getKey();@b@@b@ if (m == null)@b@ m = new Mutation(key.getRow());@b@@b@ m.putDelete(key.getColumnFamily(), key.getColumnQualifier());@b@ Master.access$200().debug("deleting entry " + key);@b@ }@b@ bw.addMutation(m);@b@ }@b@@b@ bw.flush(); }@b@@b@ private KeyExtent getHighTablet() throws AccumuloException {@b@ Connector conn;@b@ try {@b@ conn = this.this$0.getConnector();@b@ Scanner scanner = conn.createScanner("!METADATA", Constants.NO_AUTHS);@b@ Constants.METADATA_PREV_ROW_COLUMN.fetch(scanner);@b@ KeyExtent start = new KeyExtent(range.getTableId(), range.getEndRow(), null);@b@ scanner.setRange(new Range(start.getMetadataEntry(), null));@b@ Iterator iterator = scanner.iterator();@b@ if (!(iterator.hasNext()))@b@ throw new AccumuloException("No last tablet for a merge " + range);@b@@b@ Map.Entry entry = (Map.Entry)iterator.next();@b@ KeyExtent highTablet = new KeyExtent(((Key)entry.getKey()).getRow(), KeyExtent.decodePrevEndRow((Value)entry.getValue()));@b@ if (highTablet.getTableId() != range.getTableId())@b@ throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);@b@@b@ return highTablet;@b@ } catch (Exception ex) {@b@ throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex);@b@ }@b@ }@b@@b@ private void flushChanges(, List<Assignment> assignments, List<Assignment> assigned, List<TabletLocationState> assignedToDeadServers, Map<KeyExtent, TServerInstance> unassigned) throws DistributedStoreException, TException@b@ {@b@ if (!(assignedToDeadServers.isEmpty())) {@b@ int maxServersToShow = Math.min(assignedToDeadServers.size(), 100);@b@ Master.access$200().debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "...");@b@ this.store.unassign(assignedToDeadServers);@b@ Master.access$100(this.this$0).event("Marked %d tablets as unassigned because they don't have current servers", new Object[] { Integer.valueOf(assignedToDeadServers.size()) });@b@ }@b@@b@ if (!(currentTServers.isEmpty())) {@b@ Map assignedOut = new HashMap();@b@ Master.access$1900(this.this$0).getAssignments(Collections.unmodifiableSortedMap(currentTServers), Collections.unmodifiableMap(unassigned), assignedOut);@b@ for (Map.Entry assignment : assignedOut.entrySet())@b@ if (unassigned.containsKey(assignment.getKey()))@b@ if (assignment.getValue() != null) {@b@ Master.access$200().debug(this.store.name() + " assigning tablet " + assignment);@b@ assignments.add(new Assignment((KeyExtent)assignment.getKey(), (TServerInstance)assignment.getValue()));@b@ }@b@ else@b@ Master.access$200().warn(this.store.name() + " load balancer assigning tablet that was not nominated for assignment " + assignment.getKey());@b@@b@@b@ if ((!(unassigned.isEmpty())) && (assignedOut.isEmpty()))@b@ Master.access$200().warn("Load balancer failed to assign any tablets");@b@ }@b@@b@ if (assignments.size() > 0) {@b@ Master.access$200().info(java.lang.String.format("Assigning %d tablets", new Object[] { Integer.valueOf(assignments.size()) }));@b@ this.store.setFutureLocations(assignments);@b@ }@b@ assignments.addAll(assigned);@b@ for (Assignment a : assignments) {@b@ LiveTServerSet.TServerConnection conn = Master.access$700(this.this$0).getConnection(a.server);@b@ if (conn != null)@b@ conn.assignTablet(Master.access$800(this.this$0), a.tablet);@b@ else@b@ Master.access$200().warn("Could not connect to server " + a.server);@b@ }@b@ }@b@ }@b@@b@ static enum TabletGoalState@b@ {@b@ HOSTED, UNASSIGNED, DELETED;@b@ }@b@@b@ private class MasterClientServiceHandler@b@ implements MasterClientService.Iface@b@ {@b@ protected java.lang.String checkTableId(, TableOperation operation)@b@ throws org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException@b@ {@b@ java.lang.String tableId = (java.lang.String)Tables.getNameToIdMap(this.this$0.getConfiguration().getInstance()).get(tableName);@b@ if (tableId == null)@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.NOTFOUND, null);@b@ return tableId;@b@ }@b@@b@ public long initiateFlush(, TCredentials c, java.lang.String tableId) throws ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, TException@b@ {@b@ byte[] fid;@b@ Master.access$600(this.this$0).canFlush(c, tableId);@b@@b@ java.lang.String zTablePath = "/accumulo/" + this.this$0.getConfiguration().getInstance().getInstanceID() + "/tables" + "/" + tableId + "/flush-id";@b@@b@ IZooReaderWriter zoo = ZooReaderWriter.getInstance();@b@ try@b@ {@b@ fid = zoo.mutate(zTablePath, null, null, new ZooReaderWriter.Mutator(this)@b@ {@b@ public byte[] mutate() throws Exception {@b@ long flushID = Long.parseLong(new java.lang.String(currentValue, Constants.UTF8));@b@ flushID += 1L;@b@ return Long.toString(flushID).getBytes(Constants.UTF8);@b@ } } );@b@ }@b@ catch (KeeperException.NoNodeException nne) {@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.NOTFOUND, null);@b@ } catch (Exception e) {@b@ Master.access$200().warn(e.getMessage(), e);@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.OTHER, null);@b@ }@b@ return Long.parseLong(new java.lang.String(fid, Constants.UTF8));@b@ }@b@@b@ public void waitForFlush(, TCredentials c, java.lang.String tableId, ByteBuffer startRow, ByteBuffer endRow, long flushID, long maxLoops)@b@ throws ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, TException@b@ {@b@ Master.access$600(this.this$0).canFlush(c, tableId);@b@@b@ if ((endRow != null) && (startRow != null) && (ByteBufferUtil.toText(startRow).compareTo(ByteBufferUtil.toText(endRow)) >= 0)) {@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.BAD_RANGE, "start row must be less than end row");@b@ }@b@@b@ Set serversToFlush = new HashSet(Master.access$700(this.this$0).getCurrentServers());@b@@b@ for (long l = 0L; l < maxLoops; l += 1L)@b@ {@b@ for (TServerInstance instance : serversToFlush)@b@ try {@b@ LiveTServerSet.TServerConnection server = Master.access$700(this.this$0).getConnection(instance);@b@ if (server != null)@b@ server.flush(Master.access$800(this.this$0), tableId, ByteBufferUtil.toBytes(startRow), ByteBufferUtil.toBytes(endRow));@b@ } catch (TException ex) {@b@ Master.access$200().error(ex.toString());@b@ }@b@@b@@b@ if (l == maxLoops - 1L)@b@ return;@b@@b@ UtilWaitThread.sleep(50L);@b@@b@ serversToFlush.clear();@b@ try@b@ {@b@ Connector conn = this.this$0.getConnector();@b@ Scanner scanner = new IsolatedScanner(conn.createScanner("!METADATA", Constants.NO_AUTHS));@b@ Constants.METADATA_FLUSH_COLUMN.fetch(scanner);@b@ Constants.METADATA_DIRECTORY_COLUMN.fetch(scanner);@b@ scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);@b@ scanner.fetchColumnFamily(Constants.METADATA_LOG_COLUMN_FAMILY);@b@ scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange());@b@@b@ RowIterator ri = new RowIterator(scanner);@b@@b@ int tabletsToWaitFor = 0;@b@ int tabletCount = 0;@b@@b@ Text ert = ByteBufferUtil.toText(endRow);@b@@b@ while (ri.hasNext()) {@b@ Iterator row = ri.next();@b@ long tabletFlushID = -1L;@b@ int logs = 0;@b@ boolean online = false;@b@@b@ TServerInstance server = null;@b@@b@ Map.Entry entry = null;@b@ while (row.hasNext()) {@b@ entry = (Map.Entry)row.next();@b@ Key key = (Key)entry.getKey();@b@@b@ if (Constants.METADATA_FLUSH_COLUMN.equals(key.getColumnFamily(), key.getColumnQualifier())) {@b@ tabletFlushID = Long.parseLong(((Value)entry.getValue()).toString());@b@ }@b@@b@ if (Constants.METADATA_LOG_COLUMN_FAMILY.equals(key.getColumnFamily()))@b@ ++logs;@b@@b@ if (Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY.equals(key.getColumnFamily())) {@b@ online = true;@b@ server = new TServerInstance((Value)entry.getValue(), key.getColumnQualifier());@b@ }@b@@b@ }@b@@b@ if ((((online) || (logs > 0))) && (tabletFlushID < flushID)) {@b@ ++tabletsToWaitFor;@b@ if (server != null)@b@ serversToFlush.add(server);@b@ }@b@@b@ ++tabletCount;@b@@b@ Text tabletEndRow = new KeyExtent(((Key)entry.getKey()).getRow(), (Text)null).getEndRow();@b@ if (tabletEndRow == null) break; if ((ert != null) && (tabletEndRow.compareTo(ert) >= 0))@b@ break;@b@ }@b@@b@ if (tabletsToWaitFor == 0) {@b@ return;@b@ }@b@@b@ if ((tabletCount == 0) && (!(Tables.exists(Master.access$300(this.this$0), tableId))))@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(tableId, null, TableOperation.FLUSH, TableOperationExceptionType.NOTFOUND, null);@b@ }@b@ catch (AccumuloException e) {@b@ Master.access$200().debug("Failed to scan !METADATA table to wait for flush " + tableId, e);@b@ } catch (TabletIterator.TabletDeletedException tde) {@b@ Master.access$200().debug("Failed to scan !METADATA table to wait for flush " + tableId, tde);@b@ } catch (AccumuloSecurityException e) {@b@ Master.access$200().warn(e.getMessage(), e);@b@ throw new ThriftSecurityException();@b@ } catch (TableNotFoundException e) {@b@ Master.access$200().error(e.getMessage(), e);@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException();@b@ }@b@ }@b@ }@b@@b@ public MasterMonitorInfo getMasterStats(, TCredentials credentials)@b@ throws ThriftSecurityException, TException@b@ {@b@ MasterMonitorInfo result = new MasterMonitorInfo();@b@@b@ result.tServerInfo = new ArrayList();@b@ result.tableMap = new DefaultMap(new TableInfo());@b@ for (??? = Master.access$900(this.this$0).entrySet().iterator(); ???.hasNext(); ) { Map.Entry serverEntry = (Map.Entry)???.next();@b@ TabletServerStatus status = (TabletServerStatus)serverEntry.getValue();@b@ result.tServerInfo.add(status);@b@ for (Map.Entry entry : status.tableMap.entrySet()) {@b@ java.lang.String table = (java.lang.String)entry.getKey();@b@ TableInfo summary = (TableInfo)result.tableMap.get(table);@b@ Monitor.add(summary, (TableInfo)entry.getValue());@b@ }@b@ }@b@ result.badTServers = new HashMap();@b@ synchronized (Master.access$1000(this.this$0)) {@b@ for (TServerInstance bad : Master.access$1000(this.this$0).keySet())@b@ result.badTServers.put(bad.hostPort(), Byte.valueOf(TabletServerState.UNRESPONSIVE.getId()));@b@ }@b@@b@ result.state = Master.access$1100(this.this$0);@b@ result.goalState = this.this$0.getMasterGoalState();@b@ result.unassignedTablets = Master.access$1200(this.this$0);@b@ result.serversShuttingDown = new HashSet();@b@ synchronized (Master.access$1300(this.this$0)) {@b@ for (TServerInstance server : Master.access$1300(this.this$0))@b@ result.serversShuttingDown.add(server.hostPort());@b@ }@b@ DeadServerList obit = new DeadServerList(ZooUtil.getRoot(Master.access$300(this.this$0)) + "/dead/tservers");@b@ result.deadTabletServers = obit.getList();@b@ return result;@b@ }@b@@b@ private void alterTableProperty(, java.lang.String tableName, java.lang.String property, java.lang.String value, TableOperation op) throws ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException@b@ {@b@ java.lang.String tableId = checkTableId(tableName, op);@b@ if (!(Master.access$600(this.this$0).canAlterTable(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@ try@b@ {@b@ if ((value == null) || (value.isEmpty()))@b@ TablePropUtil.removeTableProperty(tableId, property);@b@ else if (!(TablePropUtil.setTableProperty(tableId, property, value)))@b@ throw new Exception("Invalid table property.");@b@ }@b@ catch (KeeperException.NoNodeException e)@b@ {@b@ checkTableId(tableName, op);@b@ Master.access$200().info("Error altering table property", e);@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(tableId, tableName, op, TableOperationExceptionType.OTHER, "Problem altering table property");@b@ } catch (Exception e) {@b@ Master.access$200().error("Problem altering table property", e);@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(tableId, tableName, op, TableOperationExceptionType.OTHER, "Problem altering table property");@b@ }@b@ }@b@@b@ public void removeTableProperty(, TCredentials credentials, java.lang.String tableName, java.lang.String property)@b@ throws ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, TException@b@ {@b@ alterTableProperty(credentials, tableName, property, null, TableOperation.REMOVE_PROPERTY);@b@ }@b@@b@ public void setTableProperty(, TCredentials credentials, java.lang.String tableName, java.lang.String property, java.lang.String value)@b@ throws ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, TException@b@ {@b@ alterTableProperty(credentials, tableName, property, value, TableOperation.SET_PROPERTY);@b@ }@b@@b@ public void shutdown(, TCredentials c, boolean stopTabletServers) throws ThriftSecurityException, TException@b@ {@b@ Master.access$600(this.this$0).canPerformSystemActions(c);@b@ Master.access$1400(this.this$0, stopTabletServers);@b@ }@b@@b@ public void shutdownTabletServer(, TCredentials c, java.lang.String tabletServer, boolean force) throws ThriftSecurityException, TException@b@ {@b@ Master.access$600(this.this$0).canPerformSystemActions(c);@b@@b@ InetSocketAddress addr = AddressUtil.parseAddress(tabletServer, Property.TSERV_CLIENTPORT);@b@ java.lang.String addrString = AddressUtil.toString(addr);@b@ TServerInstance doomed = Master.access$700(this.this$0).find(addrString);@b@ if (!(force)) {@b@ LiveTServerSet.TServerConnection server = Master.access$700(this.this$0).getConnection(doomed);@b@ if (server == null) {@b@ Master.access$200().warn("No server found for name " + tabletServer);@b@ return;@b@ }@b@ }@b@@b@ long tid = Master.access$1500(this.this$0).startTransaction();@b@ Master.access$1500(this.this$0).seedTransaction(tid, new TraceRepo(new ShutdownTServer(doomed, force)), false);@b@ Master.access$1500(this.this$0).waitForCompletion(tid);@b@ Master.access$1500(this.this$0).delete(tid);@b@ }@b@@b@ public void reportSplitExtent(, TCredentials credentials, java.lang.String serverName, TabletSplit split) throws TException@b@ {@b@ KeyExtent oldTablet = new KeyExtent(split.oldTablet);@b@ if (Master.access$1600(this.this$0).remove(oldTablet) != null)@b@ Master.access$200().info("Canceled migration of " + split.oldTablet);@b@@b@ for (TServerInstance instance : Master.access$700(this.this$0).getCurrentServers())@b@ if (serverName.equals(instance.hostPort())) {@b@ Master.access$100(this.this$0).event("%s reported split %s, %s", new Object[] { serverName, new KeyExtent((TKeyExtent)split.newTablets.get(0)), new KeyExtent((TKeyExtent)split.newTablets.get(1)) });@b@ return;@b@ }@b@@b@ Master.access$200().warn("Got a split from a server we don't recognize: " + serverName);@b@ }@b@@b@ public void reportTabletStatus(, TCredentials credentials, java.lang.String serverName, TabletLoadState status, TKeyExtent ttablet) throws TException@b@ {@b@ KeyExtent tablet = new KeyExtent(ttablet);@b@@b@ switch (Master.5.$SwitchMap$org$apache$accumulo$core$master$thrift$TabletLoadState[status.ordinal()])@b@ {@b@ case 1:@b@ Master.access$200().error(serverName + " reports assignment failed for tablet " + tablet);@b@ break;@b@ case 2:@b@ Master.access$100(this.this$0).event("tablet %s was loaded on %s", new Object[] { tablet, serverName });@b@ break;@b@ case 3:@b@ Master.access$100(this.this$0).event("tablet %s was unloaded from %s", new Object[] { tablet, serverName });@b@ break;@b@ case 4:@b@ Master.access$200().error(serverName + " reports unload failed for tablet " + tablet);@b@ break;@b@ case 5:@b@ if (!(Master.access$200().isTraceEnabled())) return;@b@ Master.access$200().trace(serverName + " reports unload failed: not serving tablet, could be a split: " + tablet); break;@b@ case 6:@b@ Master.access$100(this.this$0).event("tablet %s chopped", new Object[] { tablet });@b@ }@b@ }@b@@b@ public void setMasterGoalState(, TCredentials c, MasterGoalState state)@b@ throws ThriftSecurityException, TException@b@ {@b@ Master.access$600(this.this$0).canPerformSystemActions(c);@b@@b@ Master.access$1700(this.this$0, state);@b@ }@b@@b@ private void updatePlugins() {@b@ if (property.equals(Property.MASTER_TABLET_BALANCER.getKey())) {@b@ TabletBalancer balancer = (TabletBalancer)Master.createInstanceFromPropertyName(Master.access$300(this.this$0).getConfiguration(), Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());@b@@b@ balancer.init(Master.access$1800(this.this$0));@b@ Master.access$1902(this.this$0, balancer);@b@ Master.access$200().info("tablet balancer changed to " + Master.access$1900(this.this$0).getClass().getName());@b@ }@b@ }@b@@b@ public void removeSystemProperty(, TCredentials c, java.lang.String property) throws ThriftSecurityException, TException@b@ {@b@ Master.access$600(this.this$0).canPerformSystemActions(c);@b@ try@b@ {@b@ SystemPropUtil.removeSystemProperty(property);@b@ updatePlugins(property);@b@ } catch (Exception e) {@b@ Master.access$200().error("Problem removing config property in zookeeper", e);@b@ throw new TException(e.getMessage());@b@ }@b@ }@b@@b@ public void setSystemProperty(, TCredentials c, java.lang.String property, java.lang.String value) throws ThriftSecurityException, TException@b@ {@b@ Master.access$600(this.this$0).canPerformSystemActions(c);@b@ try@b@ {@b@ SystemPropUtil.setSystemProperty(property, value);@b@ updatePlugins(property);@b@ } catch (Exception e) {@b@ Master.access$200().error("Problem setting config property in zookeeper", e);@b@ throw new TException(e.getMessage());@b@ }@b@ }@b@@b@ private void authenticate() throws ThriftSecurityException {@b@ if (!(Master.access$600(this.this$0).authenticateUser(c, c)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.BAD_CREDENTIALS);@b@ }@b@@b@ public long beginTableOperation(, TCredentials credentials)@b@ throws ThriftSecurityException, TException@b@ {@b@ authenticate(credentials);@b@ return Master.access$1500(this.this$0).startTransaction();@b@ }@b@@b@ public void executeTableOperation(, TCredentials c, long opid, TableOperation op, List<ByteBuffer> arguments, Map<java.lang.String, java.lang.String> options, boolean autoCleanup)@b@ throws ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, TException@b@ {@b@ java.lang.String tableName;@b@ java.lang.String tableId;@b@ java.lang.String tableId;@b@ Text startRow;@b@ java.lang.String exportDir;@b@ java.lang.String tableId;@b@ Text endRow;@b@ java.lang.String tableId;@b@ authenticate(c);@b@@b@ switch (Master.5.$SwitchMap$org$apache$accumulo$core$master$thrift$TableOperation[op.ordinal()])@b@ {@b@ case 1:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ if (!(Master.access$600(this.this$0).canCreateTable(c)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@ Master.access$2000(this.this$0, tableName, TableOperation.CREATE);@b@ Master.access$2100(this.this$0, tableName, TableOperation.CREATE);@b@@b@ TimeType timeType = TimeType.valueOf(ByteBufferUtil.toString((ByteBuffer)arguments.get(1)));@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new CreateTable(c.getPrincipal(), tableName, timeType, options)), autoCleanup);@b@@b@ break;@b@ case 2:@b@ java.lang.String oldTableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ java.lang.String newTableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(1));@b@@b@ tableId = checkTableId(oldTableName, TableOperation.RENAME);@b@ Master.access$2000(this.this$0, oldTableName, TableOperation.RENAME);@b@ Master.access$2000(this.this$0, newTableName, TableOperation.RENAME);@b@ Master.access$2100(this.this$0, newTableName, TableOperation.RENAME);@b@ if (!(Master.access$600(this.this$0).canRenameTable(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new RenameTable(tableId, oldTableName, newTableName)), autoCleanup);@b@@b@ break;@b@ case 3:@b@ java.lang.String srcTableId = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ java.lang.String tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(1));@b@@b@ Master.access$2000(this.this$0, tableName, TableOperation.CLONE);@b@ Master.access$2100(this.this$0, tableName, TableOperation.CLONE);@b@ if (!(Master.access$600(this.this$0).canCloneTable(c, srcTableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Map propertiesToSet = new HashMap();@b@ Set propertiesToExclude = new HashSet();@b@@b@ Iterator i$ = options.entrySet().iterator();@b@ while (true) { Map.Entry entry;@b@ while (true) { if (!(i$.hasNext())) break label660; entry = (Map.Entry)i$.next();@b@ if (!(((java.lang.String)entry.getKey()).startsWith("!"))) break;@b@ propertiesToExclude.add(((java.lang.String)entry.getKey()).substring("!".length()));@b@ }@b@@b@ if (!(TablePropUtil.isPropertyValid((java.lang.String)entry.getKey(), (java.lang.String)entry.getValue()))) {@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(null, tableName, TableOperation.CLONE, TableOperationExceptionType.OTHER, "Property or value not valid " + ((java.lang.String)entry.getKey()) + "=" + ((java.lang.String)entry.getValue()));@b@ }@b@@b@ propertiesToSet.put(entry.getKey(), entry.getValue());@b@ }@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new CloneTable(c.getPrincipal(), srcTableId, tableName, propertiesToSet, propertiesToExclude)), autoCleanup);@b@@b@ break;@b@ case 4:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ tableId = checkTableId(tableName, TableOperation.DELETE);@b@ Master.access$2000(this.this$0, tableName, TableOperation.DELETE);@b@ if (!(Master.access$600(this.this$0).canDeleteTable(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new DeleteTable(tableId)), autoCleanup);@b@ break;@b@ case 5:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ tableId = checkTableId(tableName, TableOperation.ONLINE);@b@ Master.access$2000(this.this$0, tableName, TableOperation.ONLINE);@b@@b@ if (!(Master.access$600(this.this$0).canOnlineOfflineTable(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new ChangeTableState(tableId, TableOperation.ONLINE)), autoCleanup);@b@ break;@b@ case 6:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ tableId = checkTableId(tableName, TableOperation.OFFLINE);@b@ Master.access$2000(this.this$0, tableName, TableOperation.OFFLINE);@b@@b@ if (!(Master.access$600(this.this$0).canOnlineOfflineTable(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new ChangeTableState(tableId, TableOperation.OFFLINE)), autoCleanup);@b@ break;@b@ case 7:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ startRow = ByteBufferUtil.toText((ByteBuffer)arguments.get(1));@b@ endRow = ByteBufferUtil.toText((ByteBuffer)arguments.get(2));@b@ tableId = checkTableId(tableName, TableOperation.MERGE);@b@ if ((tableName.equals("!METADATA")) && @b@ (startRow.compareTo(new Text("0")) < 0)) {@b@ startRow = new Text("0");@b@ if ((endRow.getLength() != 0) && (endRow.compareTo(startRow) < 0))@b@ throw new org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException(null, tableName, TableOperation.MERGE, TableOperationExceptionType.OTHER, "end-row specification is in the root tablet, which cannot be merged or split");@b@@b@ }@b@@b@ Master.access$200().debug("Creating merge op: " + tableId + " " + startRow + " " + endRow);@b@@b@ if (!(Master.access$600(this.this$0).canMerge(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new TableRangeOp(MergeInfo.Operation.MERGE, tableId, startRow, endRow)), autoCleanup);@b@ break;@b@ case 8:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ startRow = ByteBufferUtil.toText((ByteBuffer)arguments.get(1));@b@ endRow = ByteBufferUtil.toText((ByteBuffer)arguments.get(2));@b@@b@ tableId = checkTableId(tableName, TableOperation.DELETE_RANGE);@b@ Master.access$2000(this.this$0, tableName, TableOperation.DELETE_RANGE);@b@@b@ if (!(Master.access$600(this.this$0).canDeleteRange(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new TableRangeOp(MergeInfo.Operation.DELETE, tableId, startRow, endRow)), autoCleanup);@b@ break;@b@ case 9:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ java.lang.String dir = ByteBufferUtil.toString((ByteBuffer)arguments.get(1));@b@ java.lang.String failDir = ByteBufferUtil.toString((ByteBuffer)arguments.get(2));@b@ boolean setTime = Boolean.parseBoolean(ByteBufferUtil.toString((ByteBuffer)arguments.get(3)));@b@@b@ java.lang.String tableId = checkTableId(tableName, TableOperation.BULK_IMPORT);@b@ Master.access$2000(this.this$0, tableName, TableOperation.BULK_IMPORT);@b@@b@ if (!(Master.access$600(this.this$0).canBulkImport(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new BulkImport(tableId, dir, failDir, setTime)), autoCleanup);@b@ break;@b@ case 10:@b@ tableId = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ byte[] startRow = ByteBufferUtil.toBytes((ByteBuffer)arguments.get(1));@b@ byte[] endRow = ByteBufferUtil.toBytes((ByteBuffer)arguments.get(2));@b@ List iterators = IteratorUtil.decodeIteratorSettings(ByteBufferUtil.toBytes((ByteBuffer)arguments.get(3)));@b@@b@ if (!(Master.access$600(this.this$0).canCompact(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new CompactRange(tableId, startRow, endRow, iterators)), autoCleanup);@b@ break;@b@ case 11:@b@ tableId = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@@b@ if (!(Master.access$600(this.this$0).canCompact(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new CancelCompactions(tableId)), autoCleanup);@b@ break;@b@ case 12:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ exportDir = ByteBufferUtil.toString((ByteBuffer)arguments.get(1));@b@@b@ if (!(Master.access$600(this.this$0).canImport(c)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$2000(this.this$0, tableName, TableOperation.CREATE);@b@ Master.access$2100(this.this$0, tableName, TableOperation.CREATE);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new ImportTable(c.getPrincipal(), tableName, exportDir)), autoCleanup);@b@ break;@b@ case 13:@b@ tableName = ByteBufferUtil.toString((ByteBuffer)arguments.get(0));@b@ exportDir = ByteBufferUtil.toString((ByteBuffer)arguments.get(1));@b@@b@ tableId = checkTableId(tableName, TableOperation.EXPORT);@b@@b@ if (!(Master.access$600(this.this$0).canExport(c, tableId)))@b@ throw new ThriftSecurityException(c.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED);@b@@b@ Master.access$2000(this.this$0, tableName, TableOperation.EXPORT);@b@@b@ Master.access$1500(this.this$0).seedTransaction(opid, new TraceRepo(new ExportTable(tableName, tableId, exportDir)), autoCleanup);@b@ break;@b@ default:@b@ label660: throw new UnsupportedOperationException();@b@ }@b@ }@b@@b@ public java.lang.String waitForTableOperation(, TCredentials credentials, long opid)@b@ throws ThriftSecurityException, org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException, TException@b@ {@b@ authenticate(credentials);@b@@b@ ReadOnlyTStore.TStatus status = Master.access$1500(this.this$0).waitForCompletion(opid);@b@ if (status == ReadOnlyTStore.TStatus.FAILED) {@b@ Exception e = Master.access$1500(this.this$0).getException(opid);@b@ if (e instanceof org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException)@b@ throw ((org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException)e);@b@ if (e instanceof ThriftSecurityException)@b@ throw ((ThriftSecurityException)e);@b@ if (e instanceof RuntimeException)@b@ throw ((RuntimeException)e);@b@@b@ throw new RuntimeException(e);@b@ }@b@@b@ java.lang.String ret = Master.access$1500(this.this$0).getReturn(opid);@b@ if (ret == null)@b@ ret = "";@b@ return ret;@b@ }@b@@b@ public void finishTableOperation(, TCredentials credentials, long opid) throws ThriftSecurityException, TException@b@ {@b@ authenticate(credentials);@b@ Master.access$1500(this.this$0).delete(opid);@b@ }@b@ }@b@}
2.ZooLock
package org.apache.accumulo.server.zookeeper;@b@@b@import org.apache.zookeeper.KeeperException;@b@@b@public class ZooLock extends org.apache.accumulo.fate.zookeeper.ZooLock@b@{@b@ public ZooLock(String path)@b@ {@b@ super(new ZooCache(), ZooReaderWriter.getInstance(), path);@b@ }@b@@b@ public static void deleteLock(String path) throws InterruptedException, KeeperException {@b@ deleteLock(ZooReaderWriter.getInstance(), path);@b@ }@b@@b@ public static boolean deleteLock(String path, String lockData) throws InterruptedException, KeeperException {@b@ return deleteLock(ZooReaderWriter.getInstance(), path, lockData);@b@ }@b@}
3.RecoveryManager
package org.apache.accumulo.server.master.recovery;@b@@b@import java.io.IOException;@b@import java.util.Collection;@b@import java.util.HashMap;@b@import java.util.HashSet;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Set;@b@import java.util.concurrent.Executors;@b@import java.util.concurrent.ScheduledExecutorService;@b@import java.util.concurrent.TimeUnit;@b@import org.apache.accumulo.core.Constants;@b@import org.apache.accumulo.core.conf.AccumuloConfiguration;@b@import org.apache.accumulo.core.conf.Property;@b@import org.apache.accumulo.core.data.KeyExtent;@b@import org.apache.accumulo.core.util.NamingThreadFactory;@b@import org.apache.accumulo.core.zookeeper.ZooUtil;@b@import org.apache.accumulo.server.conf.ServerConfiguration;@b@import org.apache.accumulo.server.master.Master;@b@import org.apache.accumulo.server.trace.TraceFileSystem;@b@import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;@b@import org.apache.accumulo.server.zookeeper.ZooCache;@b@import org.apache.hadoop.fs.FileSystem;@b@import org.apache.hadoop.fs.Path;@b@import org.apache.log4j.Logger;@b@import org.apache.zookeeper.KeeperException;@b@@b@public class RecoveryManager@b@{@b@ private static Logger log = Logger.getLogger(RecoveryManager.class);@b@ private Map<String, Long> recoveryDelay = new HashMap();@b@ private Set<String> closeTasksQueued = new HashSet();@b@ private Set<String> sortsQueued = new HashSet();@b@ private ScheduledExecutorService executor;@b@ private Master master;@b@ private ZooCache zooCache;@b@@b@ public RecoveryManager(Master master)@b@ {@b@ this.master = master;@b@ this.executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));@b@ this.zooCache = new ZooCache();@b@ try {@b@ List workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + "/recovery").getWorkQueued();@b@ this.sortsQueued.addAll(workIDs);@b@ } catch (Exception e) {@b@ log.warn(e, e);@b@ }@b@ }@b@@b@ private void initiateSort(String host, String file)@b@ throws KeeperException, InterruptedException@b@ {@b@ String source = getSource(host, file).toString();@b@ new DistributedWorkQueue(ZooUtil.getRoot(this.master.getInstance()) + "/recovery").addWork(file, source.getBytes(Constants.UTF8));@b@@b@ synchronized (this) {@b@ this.sortsQueued.add(file);@b@ }@b@@b@ String path = ZooUtil.getRoot(this.master.getInstance()) + "/recovery" + "/" + file;@b@ log.info("Created zookeeper entry " + path + " with data " + source);@b@ }@b@@b@ private Path getSource(String server, String file) {@b@ String source = Constants.getWalDirectory(this.master.getSystemConfiguration()) + "/" + server + "/" + file;@b@ if (server.contains(":"))@b@ {@b@ source = Constants.getWalDirectory(this.master.getSystemConfiguration()) + "/" + file;@b@ }@b@ return new Path(source);@b@ }@b@@b@ public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException { boolean recoveryNeeded = false;@b@ for (Iterator i$ = walogs.iterator(); i$.hasNext(); ) {@b@ Collection logs = (Collection)i$.next();@b@ Iterator i$ = logs.iterator();@b@ }@b@@b@ label590: return recoveryNeeded;@b@ }@b@@b@ private class LogSortTask@b@ implements Runnable@b@ {@b@ private String filename;@b@ private String host;@b@ private LogCloser closer;@b@@b@ public LogSortTask(, LogCloser paramLogCloser, String paramString1, String paramString2)@b@ {@b@ this.closer = paramLogCloser;@b@ this.host = host;@b@ this.filename = paramString2;@b@ }@b@@b@ public void run()@b@ {@b@ boolean rescheduled = false;@b@ try {@b@ ??? = RecoveryManager.access$000(this.this$0).getFileSystem();@b@ if (??? instanceof TraceFileSystem)@b@ ??? = ((TraceFileSystem)???).getImplementation();@b@@b@ long time = this.closer.close(RecoveryManager.access$000(this.this$0), ???, RecoveryManager.access$100(this.this$0, this.host, this.filename));@b@@b@ if (time > 0L) {@b@ RecoveryManager.access$200(this.this$0).schedule(this, time, TimeUnit.MILLISECONDS);@b@ rescheduled = true;@b@ } else {@b@ RecoveryManager.access$300(this.this$0, this.host, this.filename);@b@ }@b@ } catch (java.io.FileNotFoundException ) {@b@ RecoveryManager.access$400().debug("Unable to initate log sort for " + this.filename + ": " + ???);@b@ } catch (Exception ) {@b@ RecoveryManager.access$400().warn("Failed to initiate log sort " + this.filename, ???);@b@ } finally {@b@ if (!(rescheduled))@b@ synchronized (this.this$0) {@b@ RecoveryManager.access$500(this.this$0).remove(this.filename);@b@ }@b@ }@b@ }@b@ }@b@}