首页

关于flink源码包中运行时环境RuntimeEnvironment定义实现分析说明

标签:运行时环境,RuntimeEnvironment,flink,系统设计     发布时间:2018-05-02   

一、前言

关于flink源码包中的org.apache.flink.runtime.execution.Environment、org.apache.flink.runtime.execution.RuntimeEnvironment运行环境定义及实现,其中实现运行环境主要依赖IO输入输出管理IOManager、内存管理MemoryManager、任务执行监控管理executionObserver、定时任务配置等在运行阶段需要配置管理的部分。

二、源码说明

1.Environment接口

package org.apache.flink.runtime.execution;@b@@b@import java.util.Map;@b@import java.util.Set;@b@import java.util.concurrent.FutureTask;@b@import org.apache.flink.configuration.Configuration;@b@import org.apache.flink.core.fs.Path;@b@import org.apache.flink.core.io.IOReadableWritable;@b@import org.apache.flink.runtime.io.disk.iomanager.IOManager;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;@b@import org.apache.flink.runtime.io.network.channels.ChannelID;@b@import org.apache.flink.runtime.io.network.gates.GateID;@b@import org.apache.flink.runtime.io.network.gates.InputGate;@b@import org.apache.flink.runtime.io.network.gates.OutputGate;@b@import org.apache.flink.runtime.jobgraph.JobID;@b@import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;@b@import org.apache.flink.runtime.memorymanager.MemoryManager;@b@import org.apache.flink.runtime.protocols.AccumulatorProtocol;@b@@b@public abstract interface Environment@b@{@b@  public abstract JobID getJobID();@b@@b@  public abstract Configuration getTaskConfiguration();@b@@b@  public abstract Configuration getJobConfiguration();@b@@b@  public abstract int getCurrentNumberOfSubtasks();@b@@b@  public abstract int getIndexInSubtaskGroup();@b@@b@  public abstract void userThreadStarted(Thread paramThread);@b@@b@  public abstract void userThreadFinished(Thread paramThread);@b@@b@  public abstract InputSplitProvider getInputSplitProvider();@b@@b@  public abstract IOManager getIOManager();@b@@b@  public abstract MemoryManager getMemoryManager();@b@@b@  public abstract String getTaskName();@b@@b@  public abstract GateID getNextUnboundInputGateID();@b@@b@  public abstract int getNumberOfOutputGates();@b@@b@  public abstract int getNumberOfInputGates();@b@@b@  public abstract int getNumberOfOutputChannels();@b@@b@  public abstract int getNumberOfInputChannels();@b@@b@  public abstract OutputGate createAndRegisterOutputGate();@b@@b@  public abstract <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate();@b@@b@  public abstract Set<ChannelID> getOutputChannelIDs();@b@@b@  public abstract Set<ChannelID> getInputChannelIDs();@b@@b@  public abstract Set<GateID> getOutputGateIDs();@b@@b@  public abstract Set<GateID> getInputGateIDs();@b@@b@  public abstract Set<ChannelID> getOutputChannelIDsOfGate(GateID paramGateID);@b@@b@  public abstract Set<ChannelID> getInputChannelIDsOfGate(GateID paramGateID);@b@@b@  public abstract AccumulatorProtocol getAccumulatorProtocolProxy();@b@@b@  public abstract BufferProvider getOutputBufferProvider();@b@@b@  public abstract Map<String, FutureTask<Path>> getCopyTask();@b@}

2.RuntimeEnvironment实现类

package org.apache.flink.runtime.execution;@b@@b@import java.io.IOException;@b@import java.util.ArrayDeque;@b@import java.util.Collections;@b@import java.util.HashSet;@b@import java.util.Iterator;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Queue;@b@import java.util.Set;@b@import java.util.concurrent.CopyOnWriteArrayList;@b@import java.util.concurrent.FutureTask;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.configuration.Configuration;@b@import org.apache.flink.core.fs.Path;@b@import org.apache.flink.core.io.IOReadableWritable;@b@import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;@b@import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;@b@import org.apache.flink.runtime.io.disk.iomanager.IOManager;@b@import org.apache.flink.runtime.io.network.Buffer;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;@b@import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider.BufferAvailabilityRegistration;@b@import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;@b@import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool;@b@import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;@b@import org.apache.flink.runtime.io.network.channels.ChannelID;@b@import org.apache.flink.runtime.io.network.channels.InputChannel;@b@import org.apache.flink.runtime.io.network.channels.OutputChannel;@b@import org.apache.flink.runtime.io.network.gates.GateID;@b@import org.apache.flink.runtime.io.network.gates.InputGate;@b@import org.apache.flink.runtime.io.network.gates.OutputGate;@b@import org.apache.flink.runtime.jobgraph.JobID;@b@import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;@b@import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;@b@import org.apache.flink.runtime.memorymanager.MemoryManager;@b@import org.apache.flink.runtime.protocols.AccumulatorProtocol;@b@import org.apache.flink.util.StringUtils;@b@@b@public class RuntimeEnvironment@b@  implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable@b@{@b@  private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);@b@  private static final int SLEEPINTERVAL = 100;@b@  private final List<OutputGate> outputGates = new CopyOnWriteArrayList();@b@  private final List<InputGate<? extends IOReadableWritable>> inputGates = new CopyOnWriteArrayList();@b@  private final Queue<GateID> unboundInputGateIDs = new ArrayDeque();@b@  private final MemoryManager memoryManager;@b@  private final IOManager ioManager;@b@  private final Class<? extends AbstractInvokable> invokableClass;@b@  private final AbstractInvokable invokable;@b@  private final JobID jobID;@b@  private final Configuration jobConfiguration;@b@  private final Configuration taskConfiguration;@b@  private final InputSplitProvider inputSplitProvider;@b@  private volatile ExecutionObserver executionObserver = null;@b@  private volatile Thread executingThread;@b@  private AccumulatorProtocol accumulatorProtocolProxy = null;@b@  private final int indexInSubtaskGroup;@b@  private final int currentNumberOfSubtasks;@b@  private final String taskName;@b@  private LocalBufferPool outputBufferPool;@b@  private final Map<String, FutureTask<Path>> cacheCopyTasks;@b@  private volatile boolean canceled;@b@@b@  public RuntimeEnvironment(TaskDeploymentDescriptor tdd, MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider, AccumulatorProtocol accumulatorProtocolProxy, Map<String, FutureTask<Path>> cpTasks)@b@    throws Exception@b@  {@b@    this.jobID = tdd.getJobID();@b@    this.taskName = tdd.getTaskName();@b@    this.invokableClass = tdd.getInvokableClass();@b@    this.jobConfiguration = tdd.getJobConfiguration();@b@    this.taskConfiguration = tdd.getTaskConfiguration();@b@    this.indexInSubtaskGroup = tdd.getIndexInSubtaskGroup();@b@    this.currentNumberOfSubtasks = tdd.getCurrentNumberOfSubtasks();@b@    this.memoryManager = memoryManager;@b@    this.ioManager = ioManager;@b@    this.inputSplitProvider = inputSplitProvider;@b@    this.accumulatorProtocolProxy = accumulatorProtocolProxy;@b@    this.cacheCopyTasks = cpTasks;@b@@b@    this.invokable = ((AbstractInvokable)this.invokableClass.newInstance());@b@    this.invokable.setEnvironment(this);@b@    this.invokable.registerInputOutput();@b@@b@    int numOutputGates = tdd.getNumberOfOutputGateDescriptors();@b@@b@    for (int i = 0; i < numOutputGates; ++i) {@b@      ((OutputGate)this.outputGates.get(i)).initializeChannels(tdd.getOutputGateDescriptor(i));@b@    }@b@@b@    int numInputGates = tdd.getNumberOfInputGateDescriptors();@b@@b@    for (int i = 0; i < numInputGates; ++i)@b@      ((InputGate)this.inputGates.get(i)).initializeChannels(tdd.getInputGateDescriptor(i));@b@  }@b@@b@  public AbstractInvokable getInvokable()@b@  {@b@    return this.invokable;@b@  }@b@@b@  public JobID getJobID()@b@  {@b@    return this.jobID;@b@  }@b@@b@  public GateID getNextUnboundInputGateID()@b@  {@b@    return ((GateID)this.unboundInputGateIDs.poll());@b@  }@b@@b@  public OutputGate createAndRegisterOutputGate()@b@  {@b@    OutputGate gate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());@b@    this.outputGates.add(gate);@b@@b@    return gate;@b@  }@b@@b@  public void run()@b@  {@b@    if (this.invokable == null) {@b@      LOG.fatal("ExecutionEnvironment has no Invokable set");@b@    }@b@@b@    changeExecutionState(ExecutionState.RUNNING, null);@b@@b@    if (this.executionObserver.isCanceled()) {@b@      changeExecutionState(ExecutionState.CANCELED, null);@b@      return;@b@    }@b@    try@b@    {@b@      ClassLoader cl = LibraryCacheManager.getClassLoader(this.jobID);@b@      Thread.currentThread().setContextClassLoader(cl);@b@      this.invokable.invoke();@b@@b@      if (this.executionObserver.isCanceled())@b@        throw new InterruptedException();@b@    }@b@    catch (Throwable t) {@b@      if (!(this.executionObserver.isCanceled()))@b@      {@b@        try@b@        {@b@          this.invokable.cancel();@b@        } catch (Throwable t2) {@b@          LOG.error(StringUtils.stringifyException(t2));@b@        }@b@@b@      }@b@@b@      releaseAllChannelResources();@b@@b@      if ((this.executionObserver.isCanceled()) || (t instanceof CancelTaskException)) {@b@        changeExecutionState(ExecutionState.CANCELED, null);@b@      }@b@      else {@b@        changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));@b@      }@b@@b@      return;@b@    }@b@@b@    changeExecutionState(ExecutionState.FINISHING, null);@b@    try@b@    {@b@      closeInputGates();@b@@b@      requestAllOutputGatesToClose();@b@@b@      waitForInputChannelsToBeClosed();@b@@b@      waitForOutputChannelsToBeClosed();@b@    }@b@    catch (Throwable t)@b@    {@b@      releaseAllChannelResources();@b@@b@      if ((this.executionObserver.isCanceled()) || (t instanceof CancelTaskException)) {@b@        changeExecutionState(ExecutionState.CANCELED, null);@b@      }@b@      else {@b@        changeExecutionState(ExecutionState.FAILED, StringUtils.stringifyException(t));@b@      }@b@@b@      return;@b@    }@b@@b@    releaseAllChannelResources();@b@@b@    changeExecutionState(ExecutionState.FINISHED, null);@b@  }@b@@b@  public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate()@b@  {@b@    InputGate gate = new InputGate(getJobID(), new GateID(), getNumberOfInputGates());@b@    this.inputGates.add(gate);@b@@b@    return gate;@b@  }@b@@b@  public int getNumberOfOutputGates() {@b@    return this.outputGates.size();@b@  }@b@@b@  public int getNumberOfInputGates()@b@  {@b@    return this.inputGates.size();@b@  }@b@@b@  public int getNumberOfOutputChannels()@b@  {@b@    int numberOfOutputChannels = 0;@b@    for (int i = 0; i < this.outputGates.size(); ++i) {@b@      numberOfOutputChannels += ((OutputGate)this.outputGates.get(i)).getNumChannels();@b@    }@b@@b@    return numberOfOutputChannels;@b@  }@b@@b@  public int getNumberOfInputChannels()@b@  {@b@    int numberOfInputChannels = 0;@b@    for (int i = 0; i < this.inputGates.size(); ++i) {@b@      numberOfInputChannels += ((InputGate)this.inputGates.get(i)).getNumberOfInputChannels();@b@    }@b@@b@    return numberOfInputChannels;@b@  }@b@@b@  public InputGate<? extends IOReadableWritable> getInputGate(int pos)@b@  {@b@    if (pos < this.inputGates.size()) {@b@      return ((InputGate)this.inputGates.get(pos));@b@    }@b@@b@    return null;@b@  }@b@@b@  public OutputGate getOutputGate(int index)@b@  {@b@    if (index < this.outputGates.size()) {@b@      return ((OutputGate)this.outputGates.get(index));@b@    }@b@@b@    return null;@b@  }@b@@b@  public Thread getExecutingThread()@b@  {@b@    synchronized (this)@b@    {@b@      if (this.executingThread == null) {@b@        if (this.taskName == null) {@b@          this.executingThread = new Thread(this);@b@        }@b@        else@b@          this.executingThread = new Thread(this, getTaskNameWithIndex());@b@@b@      }@b@@b@      return this.executingThread;@b@    }@b@  }@b@@b@  public void cancelExecution() {@b@    this.canceled = true;@b@@b@    LOG.info(new StringBuilder().append("Canceling ").append(getTaskNameWithIndex()).toString());@b@@b@    if (this.invokable != null) {@b@      try {@b@        this.invokable.cancel();@b@      } catch (Throwable e) {@b@        LOG.error("Error while cancelling the task.", e);@b@      }@b@@b@    }@b@@b@    this.executingThread.interrupt();@b@    try@b@    {@b@      this.executingThread.join(5000L);@b@    } catch (InterruptedException e) {@b@    }@b@    if (!(this.executingThread.isAlive())) {@b@      return;@b@    }@b@@b@    if ((this.executingThread != null) && (this.executingThread.isAlive())) {@b@      LOG.warn(new StringBuilder().append("Task ").append(getTaskName()).append(" did not react to cancelling signal. Sending repeated interrupt.").toString());@b@@b@      if (LOG.isDebugEnabled()) {@b@        StringBuilder bld = new StringBuilder("Task ").append(getTaskName()).append(" is stuck in method:\n");@b@@b@        StackTraceElement[] stack = this.executingThread.getStackTrace();@b@        StackTraceElement[] arr$ = stack; int len$ = arr$.length; for (int i$ = 0; i$ < len$; ++i$) { StackTraceElement e = arr$[i$];@b@          bld.append(e).append('\n');@b@        }@b@        LOG.debug(bld.toString());@b@      }@b@@b@      this.executingThread.interrupt();@b@      try@b@      {@b@        this.executingThread.join(1000L);@b@      }@b@      catch (InterruptedException bld)@b@      {@b@      }@b@    }@b@  }@b@@b@  private void waitForOutputChannelsToBeClosed()@b@    throws InterruptedException@b@  {@b@    if (this.executionObserver.isCanceled()) {@b@      return;@b@    }@b@@b@    for (OutputGate og : this.outputGates)@b@      og.waitForGateToBeClosed();@b@  }@b@@b@  private void waitForInputChannelsToBeClosed()@b@    throws IOException, InterruptedException@b@  {@b@    while (!(this.canceled))@b@    {@b@      if (this.executionObserver.isCanceled()) {@b@        throw new InterruptedException();@b@      }@b@@b@      boolean allClosed = true;@b@      for (int i = 0; i < getNumberOfInputGates(); ++i) {@b@        InputGate eig = (InputGate)this.inputGates.get(i);@b@        if (!(eig.isClosed()))@b@          allClosed = false;@b@@b@      }@b@@b@      if (allClosed) {@b@        return;@b@      }@b@@b@      Thread.sleep(100L);@b@    }@b@  }@b@@b@  private void closeInputGates()@b@    throws IOException, InterruptedException@b@  {@b@    for (int i = 0; i < this.inputGates.size(); ++i) {@b@      InputGate eig = (InputGate)this.inputGates.get(i);@b@@b@      eig.close();@b@    }@b@  }@b@@b@  private void requestAllOutputGatesToClose()@b@    throws IOException, InterruptedException@b@  {@b@    for (int i = 0; i < this.outputGates.size(); ++i)@b@      ((OutputGate)this.outputGates.get(i)).requestClose();@b@  }@b@@b@  public IOManager getIOManager()@b@  {@b@    return this.ioManager;@b@  }@b@@b@  public MemoryManager getMemoryManager()@b@  {@b@    return this.memoryManager;@b@  }@b@@b@  public Configuration getTaskConfiguration()@b@  {@b@    return this.taskConfiguration;@b@  }@b@@b@  public Configuration getJobConfiguration()@b@  {@b@    return this.jobConfiguration;@b@  }@b@@b@  public int getCurrentNumberOfSubtasks()@b@  {@b@    return this.currentNumberOfSubtasks;@b@  }@b@@b@  public int getIndexInSubtaskGroup()@b@  {@b@    return this.indexInSubtaskGroup;@b@  }@b@@b@  private void changeExecutionState(ExecutionState newExecutionState, String optionalMessage) {@b@    if (this.executionObserver != null)@b@      this.executionObserver.executionStateChanged(newExecutionState, optionalMessage);@b@  }@b@@b@  public String getTaskName()@b@  {@b@    return this.taskName;@b@  }@b@@b@  public String getTaskNameWithIndex()@b@  {@b@    return String.format("%s (%d/%d)", new Object[] { this.taskName, Integer.valueOf(getIndexInSubtaskGroup() + 1), Integer.valueOf(getCurrentNumberOfSubtasks()) });@b@  }@b@@b@  public void setExecutionObserver(ExecutionObserver executionObserver)@b@  {@b@    this.executionObserver = executionObserver;@b@  }@b@@b@  public InputSplitProvider getInputSplitProvider()@b@  {@b@    return this.inputSplitProvider;@b@  }@b@@b@  public void userThreadStarted(Thread userThread)@b@  {@b@    if (this.executionObserver != null)@b@      this.executionObserver.userThreadStarted(userThread);@b@  }@b@@b@  public void userThreadFinished(Thread userThread)@b@  {@b@    if (this.executionObserver != null)@b@      this.executionObserver.userThreadFinished(userThread);@b@  }@b@@b@  private void releaseAllChannelResources()@b@  {@b@    for (int i = 0; i < this.inputGates.size(); ++i) {@b@      ((InputGate)this.inputGates.get(i)).releaseAllChannelResources();@b@    }@b@@b@    for (i = 0; i < this.outputGates.size(); ++i)@b@      ((OutputGate)this.outputGates.get(i)).releaseAllChannelResources();@b@  }@b@@b@  public Set<ChannelID> getOutputChannelIDs()@b@  {@b@    OutputChannel[] arr$;@b@    int i$;@b@    Set ids = new HashSet();@b@@b@    for (OutputGate gate : this.outputGates) {@b@      arr$ = gate.channels(); int len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { OutputChannel channel = arr$[i$];@b@        ids.add(channel.getID());@b@      }@b@    }@b@@b@    return Collections.unmodifiableSet(ids);@b@  }@b@@b@  public Set<ChannelID> getInputChannelIDs()@b@  {@b@    InputGate outputGate;@b@    int i;@b@    Set inputChannelIDs = new HashSet();@b@@b@    Iterator gateIterator = this.inputGates.iterator();@b@    while (gateIterator.hasNext())@b@    {@b@      outputGate = (InputGate)gateIterator.next();@b@      for (i = 0; i < outputGate.getNumberOfInputChannels(); ++i)@b@        inputChannelIDs.add(outputGate.getInputChannel(i).getID());@b@@b@    }@b@@b@    return Collections.unmodifiableSet(inputChannelIDs);@b@  }@b@@b@  public Set<GateID> getInputGateIDs()@b@  {@b@    Set inputGateIDs = new HashSet();@b@@b@    Iterator gateIterator = this.inputGates.iterator();@b@    while (gateIterator.hasNext()) {@b@      inputGateIDs.add(((InputGate)gateIterator.next()).getGateID());@b@    }@b@@b@    return Collections.unmodifiableSet(inputGateIDs);@b@  }@b@@b@  public Set<GateID> getOutputGateIDs()@b@  {@b@    Set outputGateIDs = new HashSet();@b@@b@    Iterator gateIterator = this.outputGates.iterator();@b@    while (gateIterator.hasNext()) {@b@      outputGateIDs.add(((OutputGate)gateIterator.next()).getGateID());@b@    }@b@@b@    return Collections.unmodifiableSet(outputGateIDs);@b@  }@b@@b@  public Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID)@b@  {@b@    OutputGate outputGate = null;@b@    Iterator gateIterator = this.outputGates.iterator();@b@    while (gateIterator.hasNext()) {@b@      OutputGate candidateGate = (OutputGate)gateIterator.next();@b@      if (candidateGate.getGateID().equals(gateID)) {@b@        outputGate = candidateGate;@b@        break;@b@      }@b@    }@b@@b@    if (outputGate == null) {@b@      throw new IllegalArgumentException(new StringBuilder().append("Cannot find output gate with ID ").append(gateID).toString());@b@    }@b@@b@    Set outputChannelIDs = new HashSet();@b@@b@    for (int i = 0; i < outputGate.getNumChannels(); ++i) {@b@      outputChannelIDs.add(outputGate.getChannel(i).getID());@b@    }@b@@b@    return Collections.unmodifiableSet(outputChannelIDs);@b@  }@b@@b@  public Set<ChannelID> getInputChannelIDsOfGate(GateID gateID)@b@  {@b@    InputGate inputGate = null;@b@    Iterator gateIterator = this.inputGates.iterator();@b@    while (gateIterator.hasNext()) {@b@      InputGate candidateGate = (InputGate)gateIterator.next();@b@      if (candidateGate.getGateID().equals(gateID)) {@b@        inputGate = candidateGate;@b@        break;@b@      }@b@    }@b@@b@    if (inputGate == null) {@b@      throw new IllegalArgumentException(new StringBuilder().append("Cannot find input gate with ID ").append(gateID).toString());@b@    }@b@@b@    Set inputChannelIDs = new HashSet();@b@@b@    for (int i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {@b@      inputChannelIDs.add(inputGate.getInputChannel(i).getID());@b@    }@b@@b@    return Collections.unmodifiableSet(inputChannelIDs);@b@  }@b@@b@  public List<OutputGate> outputGates() {@b@    return this.outputGates;@b@  }@b@@b@  public List<InputGate<? extends IOReadableWritable>> inputGates() {@b@    return this.inputGates;@b@  }@b@@b@  public AccumulatorProtocol getAccumulatorProtocolProxy()@b@  {@b@    return this.accumulatorProtocolProxy;@b@  }@b@@b@  public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {@b@    this.cacheCopyTasks.put(name, copyTask);@b@  }@b@@b@  public Map<String, FutureTask<Path>> getCopyTask()@b@  {@b@    return this.cacheCopyTasks;@b@  }@b@@b@  public BufferProvider getOutputBufferProvider()@b@  {@b@    return this;@b@  }@b@@b@  public Buffer requestBuffer(int minBufferSize)@b@    throws IOException@b@  {@b@    return this.outputBufferPool.requestBuffer(minBufferSize);@b@  }@b@@b@  public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException@b@  {@b@    return this.outputBufferPool.requestBufferBlocking(minBufferSize);@b@  }@b@@b@  public int getBufferSize()@b@  {@b@    return this.outputBufferPool.getBufferSize();@b@  }@b@@b@  public BufferProvider.BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener)@b@  {@b@    return this.outputBufferPool.registerBufferAvailabilityListener(listener);@b@  }@b@@b@  public int getNumberOfChannels()@b@  {@b@    return getNumberOfOutputChannels();@b@  }@b@@b@  public void setDesignatedNumberOfBuffers(int numBuffers)@b@  {@b@    this.outputBufferPool.setNumDesignatedBuffers(numBuffers);@b@  }@b@@b@  public void clearLocalBufferPool()@b@  {@b@    this.outputBufferPool.destroy();@b@  }@b@@b@  public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool)@b@  {@b@    if (this.outputBufferPool == null)@b@      this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);@b@  }@b@@b@  public void logBufferUtilization()@b@  {@b@    LOG.info(String.format("\t%s: %d available, %d requested, %d designated", new Object[] { getTaskNameWithIndex(), Integer.valueOf(this.outputBufferPool.numAvailableBuffers()), Integer.valueOf(this.outputBufferPool.numRequestedBuffers()), Integer.valueOf(this.outputBufferPool.numDesignatedBuffers()) }));@b@  }@b@@b@  public void reportAsynchronousEvent()@b@  {@b@    this.outputBufferPool.reportAsynchronousEvent();@b@  }@b@}