一、前言
关于flink源码包中的org.apache.flink.runtime.execution.Environment、org.apache.flink.runtime.execution.RuntimeEnvironment运行环境定义及实现,其中实现运行环境主要依赖IO输入输出管理IOManager、内存管理MemoryManager、任务执行监控管理executionObserver、定时任务配置等在运行阶段需要配置管理的部分。
二、源码说明
1.Environment接口
package org.apache.flink.runtime.execution;@b@@b@import java.util.Map;@b@import java.util.Set;@b@import java.util.concurrent.FutureTask;@b@import org.apache.flink.configuration.Configuration;@b@import org.apache.flink.core.fs.Path;@b@import org.apache.flink.core.io.IOReadableWritable;@b@import org.apache.flink.runtime.io.disk.iomanager.IOManager;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;@b@import org.apache.flink.runtime.io.network.channels.ChannelID;@b@import org.apache.flink.runtime.io.network.gates.GateID;@b@import org.apache.flink.runtime.io.network.gates.InputGate;@b@import org.apache.flink.runtime.io.network.gates.OutputGate;@b@import org.apache.flink.runtime.jobgraph.JobID;@b@import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;@b@import org.apache.flink.runtime.memorymanager.MemoryManager;@b@import org.apache.flink.runtime.protocols.AccumulatorProtocol;@b@@b@public abstract interface Environment@b@{@b@ public abstract JobID getJobID();@b@@b@ public abstract Configuration getTaskConfiguration();@b@@b@ public abstract Configuration getJobConfiguration();@b@@b@ public abstract int getCurrentNumberOfSubtasks();@b@@b@ public abstract int getIndexInSubtaskGroup();@b@@b@ public abstract void userThreadStarted(Thread paramThread);@b@@b@ public abstract void userThreadFinished(Thread paramThread);@b@@b@ public abstract InputSplitProvider getInputSplitProvider();@b@@b@ public abstract IOManager getIOManager();@b@@b@ public abstract MemoryManager getMemoryManager();@b@@b@ public abstract String getTaskName();@b@@b@ public abstract GateID getNextUnboundInputGateID();@b@@b@ public abstract int getNumberOfOutputGates();@b@@b@ public abstract int getNumberOfInputGates();@b@@b@ public abstract int getNumberOfOutputChannels();@b@@b@ public abstract int getNumberOfInputChannels();@b@@b@ public abstract OutputGate createAndRegisterOutputGate();@b@@b@ public abstract <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate();@b@@b@ public abstract Set<ChannelID> getOutputChannelIDs();@b@@b@ public abstract Set<ChannelID> getInputChannelIDs();@b@@b@ public abstract Set<GateID> getOutputGateIDs();@b@@b@ public abstract Set<GateID> getInputGateIDs();@b@@b@ public abstract Set<ChannelID> getOutputChannelIDsOfGate(GateID paramGateID);@b@@b@ public abstract Set<ChannelID> getInputChannelIDsOfGate(GateID paramGateID);@b@@b@ public abstract AccumulatorProtocol getAccumulatorProtocolProxy();@b@@b@ public abstract BufferProvider getOutputBufferProvider();@b@@b@ public abstract Map<String, FutureTask<Path>> getCopyTask();@b@}
2.RuntimeEnvironment实现类
package org.apache.flink.runtime.execution;@b@@b@import java.io.IOException;@b@import java.util.ArrayDeque;@b@import java.util.Collections;@b@import java.util.HashSet;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Queue;@b@import java.util.Set;@b@import java.util.concurrent.CopyOnWriteArrayList;@b@import java.util.concurrent.FutureTask;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.configuration.Configuration;@b@import org.apache.flink.core.fs.Path;@b@import org.apache.flink.core.io.IOReadableWritable;@b@import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;@b@import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;@b@import org.apache.flink.runtime.io.disk.iomanager.IOManager;@b@import org.apache.flink.runtime.io.network.Buffer;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;@b@import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;@b@import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool;@b@import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;@b@import org.apache.flink.runtime.io.network.channels.ChannelID;@b@import org.apache.flink.runtime.io.network.channels.InputChannel;@b@import org.apache.flink.runtime.io.network.channels.OutputChannel;@b@import org.apache.flink.runtime.io.network.gates.GateID;@b@import org.apache.flink.runtime.io.network.gates.InputGate;@b@import org.apache.flink.runtime.io.network.gates.OutputGate;@b@import org.apache.flink.runtime.jobgraph.JobID;@b@import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;@b@import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;@b@import org.apache.flink.runtime.memorymanager.MemoryManager;@b@import org.apache.flink.runtime.protocols.AccumulatorProtocol;@b@import org.apache.flink.util.StringUtils;@b@@b@public class RuntimeEnvironment@b@ implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable@b@{@b@ private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);@b@ private static final int SLEEPINTERVAL = 100;@b@ private final List<OutputGate> outputGates = new CopyOnWriteArrayList();@b@ private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList();@b@ private final Queue<GateID> unboundInputGateIDs = new ArrayDeque();@b@ private final MemoryManager memoryManager;@b@ private final IOManager ioManager;@b@ private final Class<? extends AbstractInvokable> invokableClass;@b@ private final AbstractInvokable invokable;@b@ private final JobID jobID;@b@ private final Configuration jobConfiguration;@b@ private final Configuration taskConfiguration;@b@ private final InputSplitProvider inputSplitProvider;@b@ private volatile ExecutionObserver executionObserver = null;@b@ private volatile Thread executingThread;@b@ private AccumulatorProtocol accumulatorProtocolProxy = null;@b@ private final int indexInSubtaskGroup;@b@ private final int currentNumberOfSubtasks;@b@ private final String taskName;@b@ private LocalBufferPool outputBufferPool;@b@ private final Map<String, FutureTask<Path>> cacheCopyTasks;@b@ private volatile boolean canceled;@b@@b@ public RuntimeEnvironment(TaskDeploymentDescriptor tdd, MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider, AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks)@b@ throws Exception@b@ {@b@ this.jobID = tdd.getJobID();@b@ this.taskName = tdd.getTaskName();@b@ this.invokableClass = tdd.getInvokableClass();@b@ this.jobConfiguration = tdd.getJobConfiguration();@b@ this.taskConfiguration = tdd.getTaskConfiguration();@b@ this.indexInSubtaskGroup = tdd.getIndexInSubtaskGroup();@b@ this.currentNumberOfSubtasks = tdd.getCurrentNumberOfSubtasks();@b@ this.memoryManager = memoryManager;@b@ this.ioManager = ioManager;@b@ this.inputSplitProvider = inputSplitProvider;@b@ this.accumulatorProtocolProxy = accumulatorProtocolProxy;@b@ this.cacheCopyTasks = cpTasks;@b@@b@ this.invokable = ((AbstractInvokable)this.invokableClass.newInstance());@b@ this.invokable.setEnvironment(this);@b@ this.invokable.registerInputOutput();@b@@b@ int numOutputGates = tdd.getNumberOfOutputGateDescriptors();@b@@b@ for (int i = 0; i < numOutputGates; ++i) {@b@ ((OutputGate)this.outputGates.get(i)).initializeChannels(tdd.getOutputGateDescriptor(i));@b@ }@b@@b@ int numInputGates = tdd.getNumberOfInputGateDescriptors();@b@@b@ for (int i = 0; i < numInputGates; ++i)@b@ ((InputGate)this.inputGates.get(i)).initializeChannels(tdd.getInputGateDescriptor(i));@b@ }@b@@b@ public AbstractInvokable getInvokable()@b@ {@b@ return this.invokable;@b@ }@b@@b@ public JobID getJobID()@b@ {@b@ return this.jobID;@b@ }@b@@b@ public GateID getNextUnboundInputGateID()@b@ {@b@ return ((GateID)this.unboundInputGateIDs.poll());@b@ }@b@@b@ public OutputGate createAndRegisterOutputGate()@b@ {@b@ OutputGate gate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());@b@ this.outputGates.add(gate);@b@@b@ return gate;@b@ }@b@@b@ public void run()@b@ {@b@ if (this.invokable == null) {@b@ LOG.fatal("ExecutionEnvironment has no Invokable set");@b@ }@b@@b@ changeExecutionState(ExecutionState.RUNNING, null);@b@@b@ if (this.executionObserver.isCanceled()) {@b@ changeExecutionState(ExecutionState.CANCELED, null);@b@ return;@b@ }@b@ try@b@ {@b@ ClassLoader cl = LibraryCacheManager.getClassLoader(this.jobID);@b@ Thread.currentThread().setContextClassLoader(cl);@b@ this.invokable.invoke();@b@@b@ if (this.executionObserver.isCanceled())@b@ throw new InterruptedException();@b@ }@b@ catch (Throwable t) {@b@ if (!(this.executionObserver.isCanceled()))@b@ {@b@ try@b@ {@b@ this.invokable.cancel();@b@ } catch (Throwable t2) {@b@ LOG.error(StringUtils.stringifyException(t2));@b@ }@b@@b@ }@b@@b@ releaseAllChannelResources();@b@@b@ if ((this.executionObserver.isCanceled()) || (t instanceof CancelTaskException)) {@b@ changeExecutionState(ExecutionState.CANCELED, null);@b@ }@b@ else {@b@ changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));@b@ }@b@@b@ return;@b@ }@b@@b@ changeExecutionState(ExecutionState.FINISHING, null);@b@ try@b@ {@b@ closeInputGates();@b@@b@ requestAllOutputGatesToClose();@b@@b@ waitForInputChannelsToBeClosed();@b@@b@ waitForOutputChannelsToBeClosed();@b@ }@b@ catch (Throwable t)@b@ {@b@ releaseAllChannelResources();@b@@b@ if ((this.executionObserver.isCanceled()) || (t instanceof CancelTaskException)) {@b@ changeExecutionState(ExecutionState.CANCELED, null);@b@ }@b@ else {@b@ changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));@b@ }@b@@b@ return;@b@ }@b@@b@ releaseAllChannelResources();@b@@b@ changeExecutionState(ExecutionState.FINISHED, null);@b@ }@b@@b@ public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate()@b@ {@b@ InputGate gate = new InputGate(getJobID(), new GateID(), getNumberOfInputGates());@b@ this.inputGates.add(gate);@b@@b@ return gate;@b@ }@b@@b@ public int getNumberOfOutputGates() {@b@ return this.outputGates.size();@b@ }@b@@b@ public int getNumberOfInputGates()@b@ {@b@ return this.inputGates.size();@b@ }@b@@b@ public int getNumberOfOutputChannels()@b@ {@b@ int numberOfOutputChannels = 0;@b@ for (int i = 0; i < this.outputGates.size(); ++i) {@b@ numberOfOutputChannels += ((OutputGate)this.outputGates.get(i)).getNumChannels();@b@ }@b@@b@ return numberOfOutputChannels;@b@ }@b@@b@ public int getNumberOfInputChannels()@b@ {@b@ int numberOfInputChannels = 0;@b@ for (int i = 0; i < this.inputGates.size(); ++i) {@b@ numberOfInputChannels += ((InputGate)this.inputGates.get(i)).getNumberOfInputChannels();@b@ }@b@@b@ return numberOfInputChannels;@b@ }@b@@b@ public InputGate<? extends IOReadableWritable> getInputGate(int pos)@b@ {@b@ if (pos < this.inputGates.size()) {@b@ return ((InputGate)this.inputGates.get(pos));@b@ }@b@@b@ return null;@b@ }@b@@b@ public OutputGate getOutputGate(int index)@b@ {@b@ if (index < this.outputGates.size()) {@b@ return ((OutputGate)this.outputGates.get(index));@b@ }@b@@b@ return null;@b@ }@b@@b@ public Thread getExecutingThread()@b@ {@b@ synchronized (this)@b@ {@b@ if (this.executingThread == null) {@b@ if (this.taskName == null) {@b@ this.executingThread = new Thread(this);@b@ }@b@ else@b@ this.executingThread = new Thread(this, getTaskNameWithIndex());@b@@b@ }@b@@b@ return this.executingThread;@b@ }@b@ }@b@@b@ public void cancelExecution() {@b@ this.canceled = true;@b@@b@ LOG.info(new StringBuilder().append("Canceling ").append(getTaskNameWithIndex()).toString());@b@@b@ if (this.invokable != null) {@b@ try {@b@ this.invokable.cancel();@b@ } catch (Throwable e) {@b@ LOG.error("Error while cancelling the task.", e);@b@ }@b@@b@ }@b@@b@ this.executingThread.interrupt();@b@ try@b@ {@b@ this.executingThread.join(5000L);@b@ } catch (InterruptedException e) {@b@ }@b@ if (!(this.executingThread.isAlive())) {@b@ return;@b@ }@b@@b@ if ((this.executingThread != null) && (this.executingThread.isAlive())) {@b@ LOG.warn(new StringBuilder().append("Task ").append(getTaskName()).append(" did not react to cancelling signal. Sending repeated interrupt.").toString());@b@@b@ if (LOG.isDebugEnabled()) {@b@ StringBuilder bld = new StringBuilder("Task ").append(getTaskName()).append(" is stuck in method:\n");@b@@b@ StackTraceElement[] stack = this.executingThread.getStackTrace();@b@ StackTraceElement[] arr$ = stack; int len$ = arr$.length; for (int i$ = 0; i$ < len$; ++i$) { StackTraceElement e = arr$[i$];@b@ bld.append(e).append('\n');@b@ }@b@ LOG.debug(bld.toString());@b@ }@b@@b@ this.executingThread.interrupt();@b@ try@b@ {@b@ this.executingThread.join(1000L);@b@ }@b@ catch (InterruptedException bld)@b@ {@b@ }@b@ }@b@ }@b@@b@ private void waitForOutputChannelsToBeClosed()@b@ throws InterruptedException@b@ {@b@ if (this.executionObserver.isCanceled()) {@b@ return;@b@ }@b@@b@ for (OutputGate og : this.outputGates)@b@ og.waitForGateToBeClosed();@b@ }@b@@b@ private void waitForInputChannelsToBeClosed()@b@ throws IOException, InterruptedException@b@ {@b@ while (!(this.canceled))@b@ {@b@ if (this.executionObserver.isCanceled()) {@b@ throw new InterruptedException();@b@ }@b@@b@ boolean allClosed = true;@b@ for (int i = 0; i < getNumberOfInputGates(); ++i) {@b@ InputGate eig = (InputGate)this.inputGates.get(i);@b@ if (!(eig.isClosed()))@b@ allClosed = false;@b@@b@ }@b@@b@ if (allClosed) {@b@ return;@b@ }@b@@b@ Thread.sleep(100L);@b@ }@b@ }@b@@b@ private void closeInputGates()@b@ throws IOException, InterruptedException@b@ {@b@ for (int i = 0; i < this.inputGates.size(); ++i) {@b@ InputGate eig = (InputGate)this.inputGates.get(i);@b@@b@ eig.close();@b@ }@b@ }@b@@b@ private void requestAllOutputGatesToClose()@b@ throws IOException, InterruptedException@b@ {@b@ for (int i = 0; i < this.outputGates.size(); ++i)@b@ ((OutputGate)this.outputGates.get(i)).requestClose();@b@ }@b@@b@ public IOManager getIOManager()@b@ {@b@ return this.ioManager;@b@ }@b@@b@ public MemoryManager getMemoryManager()@b@ {@b@ return this.memoryManager;@b@ }@b@@b@ public Configuration getTaskConfiguration()@b@ {@b@ return this.taskConfiguration;@b@ }@b@@b@ public Configuration getJobConfiguration()@b@ {@b@ return this.jobConfiguration;@b@ }@b@@b@ public int getCurrentNumberOfSubtasks()@b@ {@b@ return this.currentNumberOfSubtasks;@b@ }@b@@b@ public int getIndexInSubtaskGroup()@b@ {@b@ return this.indexInSubtaskGroup;@b@ }@b@@b@ private void changeExecutionState(ExecutionState newExecutionState, String optionalMessage) {@b@ if (this.executionObserver != null)@b@ this.executionObserver.executionStateChanged(newExecutionState, optionalMessage);@b@ }@b@@b@ public String getTaskName()@b@ {@b@ return this.taskName;@b@ }@b@@b@ public String getTaskNameWithIndex()@b@ {@b@ return String.format("%s (%d/%d)", new Object[] { this.taskName, Integer.valueOf(getIndexInSubtaskGroup() + 1), Integer.valueOf(getCurrentNumberOfSubtasks()) });@b@ }@b@@b@ public void setExecutionObserver(ExecutionObserver executionObserver)@b@ {@b@ this.executionObserver = executionObserver;@b@ }@b@@b@ public InputSplitProvider getInputSplitProvider()@b@ {@b@ return this.inputSplitProvider;@b@ }@b@@b@ public void userThreadStarted(Thread userThread)@b@ {@b@ if (this.executionObserver != null)@b@ this.executionObserver.userThreadStarted(userThread);@b@ }@b@@b@ public void userThreadFinished(Thread userThread)@b@ {@b@ if (this.executionObserver != null)@b@ this.executionObserver.userThreadFinished(userThread);@b@ }@b@@b@ private void releaseAllChannelResources()@b@ {@b@ for (int i = 0; i < this.inputGates.size(); ++i) {@b@ ((InputGate)this.inputGates.get(i)).releaseAllChannelResources();@b@ }@b@@b@ for (i = 0; i < this.outputGates.size(); ++i)@b@ ((OutputGate)this.outputGates.get(i)).releaseAllChannelResources();@b@ }@b@@b@ public Set<ChannelID> getOutputChannelIDs()@b@ {@b@ OutputChannel[] arr$;@b@ int i$;@b@ Set ids = new HashSet();@b@@b@ for (OutputGate gate : this.outputGates) {@b@ arr$ = gate.channels(); int len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { OutputChannel channel = arr$[i$];@b@ ids.add(channel.getID());@b@ }@b@ }@b@@b@ return Collections.unmodifiableSet(ids);@b@ }@b@@b@ public Set<ChannelID> getInputChannelIDs()@b@ {@b@ InputGate outputGate;@b@ int i;@b@ Set inputChannelIDs = new HashSet();@b@@b@ Iterator gateIterator = this.inputGates.iterator();@b@ while (gateIterator.hasNext())@b@ {@b@ outputGate = (InputGate)gateIterator.next();@b@ for (i = 0; i < outputGate.getNumberOfInputChannels(); ++i)@b@ inputChannelIDs.add(outputGate.getInputChannel(i).getID());@b@@b@ }@b@@b@ return Collections.unmodifiableSet(inputChannelIDs);@b@ }@b@@b@ public Set<GateID> getInputGateIDs()@b@ {@b@ Set inputGateIDs = new HashSet();@b@@b@ Iterator gateIterator = this.inputGates.iterator();@b@ while (gateIterator.hasNext()) {@b@ inputGateIDs.add(((InputGate)gateIterator.next()).getGateID());@b@ }@b@@b@ return Collections.unmodifiableSet(inputGateIDs);@b@ }@b@@b@ public Set<GateID> getOutputGateIDs()@b@ {@b@ Set outputGateIDs = new HashSet();@b@@b@ Iterator gateIterator = this.outputGates.iterator();@b@ while (gateIterator.hasNext()) {@b@ outputGateIDs.add(((OutputGate)gateIterator.next()).getGateID());@b@ }@b@@b@ return Collections.unmodifiableSet(outputGateIDs);@b@ }@b@@b@ public Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID)@b@ {@b@ OutputGate outputGate = null;@b@ Iterator gateIterator = this.outputGates.iterator();@b@ while (gateIterator.hasNext()) {@b@ OutputGate candidateGate = (OutputGate)gateIterator.next();@b@ if (candidateGate.getGateID().equals(gateID)) {@b@ outputGate = candidateGate;@b@ break;@b@ }@b@ }@b@@b@ if (outputGate == null) {@b@ throw new IllegalArgumentException(new StringBuilder().append("Cannot find output gate with ID ").append(gateID).toString());@b@ }@b@@b@ Set outputChannelIDs = new HashSet();@b@@b@ for (int i = 0; i < outputGate.getNumChannels(); ++i) {@b@ outputChannelIDs.add(outputGate.getChannel(i).getID());@b@ }@b@@b@ return Collections.unmodifiableSet(outputChannelIDs);@b@ }@b@@b@ public Set<ChannelID> getInputChannelIDsOfGate(GateID gateID)@b@ {@b@ InputGate inputGate = null;@b@ Iterator gateIterator = this.inputGates.iterator();@b@ while (gateIterator.hasNext()) {@b@ InputGate candidateGate = (InputGate)gateIterator.next();@b@ if (candidateGate.getGateID().equals(gateID)) {@b@ inputGate = candidateGate;@b@ break;@b@ }@b@ }@b@@b@ if (inputGate == null) {@b@ throw new IllegalArgumentException(new StringBuilder().append("Cannot find input gate with ID ").append(gateID).toString());@b@ }@b@@b@ Set inputChannelIDs = new HashSet();@b@@b@ for (int i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {@b@ inputChannelIDs.add(inputGate.getInputChannel(i).getID());@b@ }@b@@b@ return Collections.unmodifiableSet(inputChannelIDs);@b@ }@b@@b@ public List<OutputGate> outputGates() {@b@ return this.outputGates;@b@ }@b@@b@ public List<InputGate<? extends IOReadableWritable>> inputGates() {@b@ return this.inputGates;@b@ }@b@@b@ public AccumulatorProtocol getAccumulatorProtocolProxy()@b@ {@b@ return this.accumulatorProtocolProxy;@b@ }@b@@b@ public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {@b@ this.cacheCopyTasks.put(name, copyTask);@b@ }@b@@b@ public Map<String, FutureTask<Path>> getCopyTask()@b@ {@b@ return this.cacheCopyTasks;@b@ }@b@@b@ public BufferProvider getOutputBufferProvider()@b@ {@b@ return this;@b@ }@b@@b@ public Buffer requestBuffer(int minBufferSize)@b@ throws IOException@b@ {@b@ return this.outputBufferPool.requestBuffer(minBufferSize);@b@ }@b@@b@ public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException@b@ {@b@ return this.outputBufferPool.requestBufferBlocking(minBufferSize);@b@ }@b@@b@ public int getBufferSize()@b@ {@b@ return this.outputBufferPool.getBufferSize();@b@ }@b@@b@ public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener)@b@ {@b@ return this.outputBufferPool.registerBufferAvailabilityListener(listener);@b@ }@b@@b@ public int getNumberOfChannels()@b@ {@b@ return getNumberOfOutputChannels();@b@ }@b@@b@ public void setDesignatedNumberOfBuffers(int numBuffers)@b@ {@b@ this.outputBufferPool.setNumDesignatedBuffers(numBuffers);@b@ }@b@@b@ public void clearLocalBufferPool()@b@ {@b@ this.outputBufferPool.destroy();@b@ }@b@@b@ public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool)@b@ {@b@ if (this.outputBufferPool == null)@b@ this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);@b@ }@b@@b@ public void logBufferUtilization()@b@ {@b@ LOG.info(String.format("\t%s: %d available, %d requested, %d designated", new Object[] { getTaskNameWithIndex(), Integer.valueOf(this.outputBufferPool.numAvailableBuffers()), Integer.valueOf(this.outputBufferPool.numRequestedBuffers()), Integer.valueOf(this.outputBufferPool.numDesignatedBuffers()) }));@b@ }@b@@b@ public void reportAsynchronousEvent()@b@ {@b@ this.outputBufferPool.reportAsynchronousEvent();@b@ }@b@}