首页

关于flink源码包中NetUtils网络工具类获取网络地址、Socket输入流、SocketChannel创建链接等操作

标签:NetUtils,网络工具类,flink     发布时间:2018-05-01   

一、前言

关于flink源码包中org.apache.flink.runtime.net.NetUtils网络工具类,获取网络地址InetSocketAddress、Socket输入InputStream输出OutputStream流、基于SocketChannel创建Socket链接等。

二、源码说明

1.NetUtils工具类

package org.apache.flink.runtime.net;@b@@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.io.OutputStream;@b@import java.net.InetAddress;@b@import java.net.InetSocketAddress;@b@import java.net.Socket;@b@import java.net.SocketAddress;@b@import java.net.URI;@b@import java.net.URISyntaxException;@b@import java.net.UnknownHostException;@b@import java.nio.channels.SocketChannel;@b@import java.util.ArrayList;@b@import java.util.Collection;@b@import java.util.HashMap;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import java.util.Set;@b@import javax.net.SocketFactory;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.runtime.ipc.Server;@b@@b@public class NetUtils@b@{@b@  private static final Log LOG = LogFactory.getLog(NetUtils.class);@b@  private static Map<String, String> hostToResolved = new HashMap();@b@@b@  public static SocketFactory getSocketFactory()@b@  {@b@    return getDefaultSocketFactory();@b@  }@b@@b@  public static SocketFactory getDefaultSocketFactory()@b@  {@b@    return SocketFactory.getDefault();@b@  }@b@@b@  public static InetSocketAddress createSocketAddr(String target)@b@  {@b@    return createSocketAddr(target, -1);@b@  }@b@@b@  public static InetSocketAddress createSocketAddr(String target, int defaultPort)@b@  {@b@    int colonIndex = target.indexOf(58);@b@    if ((colonIndex < 0) && (defaultPort == -1))@b@      throw new RuntimeException("Not a host:port pair: " + target);@b@@b@    String hostname = "";@b@    int port = -1;@b@    if (!(target.contains("/")))@b@      if (colonIndex == -1) {@b@        hostname = target;@b@      }@b@      else {@b@        hostname = target.substring(0, colonIndex);@b@        port = Integer.parseInt(target.substring(colonIndex + 1));@b@      }@b@    else@b@      try@b@      {@b@        URI addr = new URI(target);@b@        hostname = addr.getHost();@b@        port = addr.getPort();@b@      } catch (URISyntaxException use) {@b@        LOG.fatal(use);@b@      }@b@@b@@b@    if (port == -1) {@b@      port = defaultPort;@b@    }@b@@b@    if (getStaticResolution(hostname) != null)@b@      hostname = getStaticResolution(hostname);@b@@b@    return new InetSocketAddress(hostname, port);@b@  }@b@@b@  public static void addStaticResolution(String host, String resolvedName)@b@  {@b@    synchronized (hostToResolved) {@b@      hostToResolved.put(host, resolvedName);@b@    }@b@  }@b@@b@  public static String getStaticResolution(String host)@b@  {@b@    synchronized (hostToResolved) {@b@      return ((String)hostToResolved.get(host));@b@    }@b@  }@b@@b@  public static List<String[]> getAllStaticResolutions()@b@  {@b@    synchronized (hostToResolved) {@b@      Set entries = hostToResolved.entrySet();@b@      if (entries.size() != 0) break label28;@b@      return null;@b@@b@      label28: List l = new ArrayList(entries.size());@b@      for (Map.Entry e : entries)@b@        l.add(new String[] { (String)e.getKey(), (String)e.getValue() });@b@@b@      return l;@b@    }@b@  }@b@@b@  public static InetSocketAddress getConnectAddress(Server server)@b@  {@b@    InetSocketAddress addr = server.getListenerAddress();@b@    if (addr.getAddress().getHostAddress().equals("0.0.0.0"))@b@      addr = new InetSocketAddress("127.0.0.1", addr.getPort());@b@@b@    return addr;@b@  }@b@@b@  public static InputStream getInputStream(Socket socket)@b@    throws IOException@b@  {@b@    return getInputStream(socket, socket.getSoTimeout());@b@  }@b@@b@  public static InputStream getInputStream(Socket socket, long timeout)@b@    throws IOException@b@  {@b@    return new SocketInputStream(socket, timeout);@b@  }@b@@b@  public static OutputStream getOutputStream(Socket socket)@b@    throws IOException@b@  {@b@    return getOutputStream(socket, 0L);@b@  }@b@@b@  public static OutputStream getOutputStream(Socket socket, long timeout)@b@    throws IOException@b@  {@b@    return new SocketOutputStream(socket, timeout);@b@  }@b@@b@  public static void connect(Socket socket, SocketAddress endpoint, int timeout) throws IOException {@b@    if ((socket == null) || (endpoint == null) || (timeout < 0)) {@b@      throw new IllegalArgumentException("Illegal argument for connect()");@b@    }@b@@b@    SocketChannel ch = socket.getChannel();@b@@b@    if (ch == null)@b@    {@b@      socket.connect(endpoint, timeout);@b@    }@b@    else SocketIOWithTimeout.connect(ch, endpoint, timeout);@b@  }@b@@b@  public static String normalizeHostName(String name)@b@  {@b@    if (Character.digit(name.charAt(0), 16) != -1)@b@      return name;@b@    try@b@    {@b@      InetAddress ipAddress = InetAddress.getByName(name);@b@      return ipAddress.getHostAddress(); } catch (UnknownHostException e) {@b@    }@b@    return name;@b@  }@b@@b@  public static List<String> normalizeHostNames(Collection<String> names)@b@  {@b@    List hostNames = new ArrayList(names.size());@b@    for (String name : names)@b@      hostNames.add(normalizeHostName(name));@b@@b@    return hostNames;@b@  }@b@}

2.SocketInputStream、SocketOutputStream、SocketIOWithTimeout依赖类

package org.apache.flink.runtime.net;@b@@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.net.Socket;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.ReadableByteChannel;@b@import java.nio.channels.SelectableChannel;@b@@b@public class SocketInputStream extends InputStream@b@  implements ReadableByteChannel@b@{@b@  private Reader reader;@b@@b@  public SocketInputStream(ReadableByteChannel channel, long timeout)@b@    throws IOException@b@  {@b@    SocketIOWithTimeout.checkChannelValidity(channel);@b@    this.reader = new Reader(channel, timeout);@b@  }@b@@b@  public SocketInputStream(Socket socket, long timeout)@b@    throws IOException@b@  {@b@    this(socket.getChannel(), timeout);@b@  }@b@@b@  public SocketInputStream(Socket socket)@b@    throws IOException@b@  {@b@    this(socket.getChannel(), socket.getSoTimeout());@b@  }@b@@b@  public int read()@b@    throws IOException@b@  {@b@    byte[] buf = new byte[1];@b@    int ret = read(buf, 0, 1);@b@    if (ret > 0)@b@      return buf[0];@b@@b@    if (ret != -1)@b@    {@b@      throw new IOException("Could not read from stream");@b@    }@b@    return ret;@b@  }@b@@b@  public int read(byte[] b, int off, int len) throws IOException {@b@    return read(ByteBuffer.wrap(b, off, len));@b@  }@b@@b@  public synchronized void close()@b@    throws IOException@b@  {@b@    this.reader.channel.close();@b@    this.reader.close();@b@  }@b@@b@  public ReadableByteChannel getChannel()@b@  {@b@    return this.reader.channel;@b@  }@b@@b@  public boolean isOpen()@b@  {@b@    return this.reader.isOpen();@b@  }@b@@b@  public int read(ByteBuffer dst) throws IOException {@b@    return this.reader.doIO(dst, 1);@b@  }@b@@b@  public void waitForReadable()@b@    throws IOException@b@  {@b@    this.reader.waitForIO(1);@b@  }@b@@b@  private static class Reader extends SocketIOWithTimeout@b@  {@b@    ReadableByteChannel channel;@b@@b@    Reader(ReadableByteChannel channel, long timeout)@b@      throws IOException@b@    {@b@      super((SelectableChannel)channel, timeout);@b@      this.channel = channel;@b@    }@b@@b@    int performIO(ByteBuffer buf) throws IOException {@b@      return this.channel.read(buf);@b@    }@b@  }@b@}
package org.apache.flink.runtime.net;@b@@b@import java.io.EOFException;@b@import java.io.IOException;@b@import java.io.OutputStream;@b@import java.net.Socket;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.FileChannel;@b@import java.nio.channels.SelectableChannel;@b@import java.nio.channels.WritableByteChannel;@b@@b@public class SocketOutputStream extends OutputStream@b@  implements WritableByteChannel@b@{@b@  private Writer writer;@b@@b@  public SocketOutputStream(WritableByteChannel channel, long timeout)@b@    throws IOException@b@  {@b@    SocketIOWithTimeout.checkChannelValidity(channel);@b@    this.writer = new Writer(channel, timeout);@b@  }@b@@b@  public SocketOutputStream(Socket socket, long timeout)@b@    throws IOException@b@  {@b@    this(socket.getChannel(), timeout);@b@  }@b@@b@  public void write(int b)@b@    throws IOException@b@  {@b@    byte[] buf = new byte[1];@b@    buf[0] = (byte)b;@b@    write(buf, 0, 1);@b@  }@b@@b@  public void write(byte[] b, int off, int len) throws IOException {@b@    ByteBuffer buf = ByteBuffer.wrap(b, off, len);@b@    if (buf.hasRemaining())@b@      try {@b@        if (write(buf) < 0) {@b@          throw new IOException("The stream is closed");@b@        }@b@@b@      }@b@      catch (IOException e)@b@      {@b@        if (buf.capacity() > buf.remaining())@b@          this.writer.close();@b@@b@        throw e;@b@      }@b@  }@b@@b@  public synchronized void close()@b@    throws IOException@b@  {@b@    this.writer.channel.close();@b@    this.writer.close();@b@  }@b@@b@  public WritableByteChannel getChannel()@b@  {@b@    return this.writer.channel;@b@  }@b@@b@  public boolean isOpen()@b@  {@b@    return this.writer.isOpen();@b@  }@b@@b@  public int write(ByteBuffer src) throws IOException {@b@    return this.writer.doIO(src, 4);@b@  }@b@@b@  public void waitForWritable()@b@    throws IOException@b@  {@b@    this.writer.waitForIO(4);@b@  }@b@@b@  public void transferToFully(FileChannel fileCh, long position, int count)@b@    throws IOException@b@  {@b@    while (count > 0)@b@    {@b@      waitForWritable();@b@      int nTransfered = (int)fileCh.transferTo(position, count, getChannel());@b@@b@      if (nTransfered == 0)@b@      {@b@        if (position < fileCh.size()) continue;@b@        throw new EOFException("EOF Reached. file size is " + fileCh.size() + " and " + count + " more bytes left to be " + "transfered.");@b@      }@b@@b@      if (nTransfered < 0)@b@        throw new IOException("Unexpected return of " + nTransfered + " from transferTo()");@b@@b@      position += nTransfered;@b@      count -= nTransfered;@b@    }@b@  }@b@@b@  private static class Writer extends SocketIOWithTimeout@b@  {@b@    WritableByteChannel channel;@b@@b@    Writer(WritableByteChannel channel, long timeout)@b@      throws IOException@b@    {@b@      super((SelectableChannel)channel, timeout);@b@      this.channel = channel;@b@    }@b@@b@    int performIO(ByteBuffer buf) throws IOException {@b@      return this.channel.write(buf);@b@    }@b@  }@b@}
package org.apache.flink.runtime.net;@b@@b@import java.io.IOException;@b@import java.net.SocketAddress;@b@import java.net.SocketTimeoutException;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.SelectableChannel;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.SocketChannel;@b@import java.nio.channels.spi.SelectorProvider;@b@import java.util.Iterator;@b@import java.util.LinkedList;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@abstract class SocketIOWithTimeout@b@{@b@  static final Log LOG = LogFactory.getLog(SocketIOWithTimeout.class);@b@  private SelectableChannel channel;@b@  private long timeout;@b@  private boolean closed = false;@b@  private static SelectorPool selector = new SelectorPool(null);@b@@b@  SocketIOWithTimeout(SelectableChannel channel, long timeout)@b@    throws IOException@b@  {@b@    checkChannelValidity(channel);@b@@b@    this.channel = channel;@b@    this.timeout = timeout;@b@@b@    channel.configureBlocking(false);@b@  }@b@@b@  void close() {@b@    this.closed = true;@b@  }@b@@b@  boolean isOpen() {@b@    return ((!(this.closed)) && (this.channel.isOpen()));@b@  }@b@@b@  SelectableChannel getChannel() {@b@    return this.channel;@b@  }@b@@b@  static void checkChannelValidity(Object channel)@b@    throws IOException@b@  {@b@    if (channel == null)@b@    {@b@      throw new IOException("Channel is null. Check how the channel or socket is created.");@b@    }@b@@b@    if (!(channel instanceof SelectableChannel))@b@      throw new IOException("Channel should be a SelectableChannel");@b@  }@b@@b@  abstract int performIO(ByteBuffer paramByteBuffer)@b@    throws IOException;@b@@b@  int doIO(ByteBuffer buf, int ops)@b@    throws IOException@b@  {@b@    if (!(buf.hasRemaining())) {@b@      throw new IllegalArgumentException("Buffer has no data left.");@b@    }@b@@b@    while (buf.hasRemaining()) {@b@      if (this.closed)@b@        return -1;@b@@b@      try@b@      {@b@        int n = performIO(buf);@b@        if (n != 0)@b@        {@b@          return n;@b@        }@b@      } catch (IOException e) {@b@        if (!(this.channel.isOpen()))@b@          this.closed = true;@b@@b@        throw e;@b@      }@b@@b@      int count = 0;@b@      try {@b@        count = selector.select(this.channel, ops, this.timeout);@b@      } catch (IOException e) {@b@        this.closed = true;@b@        throw e;@b@      }@b@@b@      if (count == 0) {@b@        throw new SocketTimeoutException(timeoutExceptionString(this.channel, this.timeout, ops));@b@      }@b@@b@    }@b@@b@    return 0;@b@  }@b@@b@  static void connect(SocketChannel channel, SocketAddress endpoint, int timeout)@b@    throws IOException@b@  {@b@    long timeoutLeft;@b@    boolean blockingOn = channel.isBlocking();@b@    if (blockingOn)@b@      channel.configureBlocking(false);@b@@b@    try@b@    {@b@      if (channel.connect(endpoint))@b@      {@b@        return;@b@      }@b@      timeoutLeft = timeout;@b@      long endTime = (timeout > 0) ? System.currentTimeMillis() + timeout : 0L;@b@@b@      int ret = selector.select(channel, 8, timeoutLeft);@b@@b@      if ((ret > 0) && (channel.finishConnect()))@b@      {@b@        return;@b@      }@b@      label143: if (ret != 0) { if (timeout <= 0) {@b@          break label143;@b@        }@b@@b@      }@b@@b@    }@b@    catch (IOException e)@b@    {@b@    }@b@    finally@b@    {@b@      if ((blockingOn) && (channel.isOpen()))@b@        channel.configureBlocking(true);@b@    }@b@  }@b@@b@  void waitForIO(int ops)@b@    throws IOException@b@  {@b@    if (selector.select(this.channel, ops, this.timeout) == 0)@b@      throw new SocketTimeoutException(timeoutExceptionString(this.channel, this.timeout, ops));@b@  }@b@@b@  private static String timeoutExceptionString(SelectableChannel channel, long timeout, int ops)@b@  {@b@    String waitingFor;@b@    switch (ops)@b@    {@b@    case 1:@b@      waitingFor = "read";@b@      break;@b@    case 4:@b@      waitingFor = "write";@b@      break;@b@    case 8:@b@      waitingFor = "connect";@b@      break;@b@    default:@b@      waitingFor = "" + ops;@b@    }@b@@b@    return timeout + " millis timeout while " + "waiting for channel to be ready for " + waitingFor + ". ch : " + channel;@b@  }@b@@b@  private static class SelectorPool@b@  {@b@    private static final long IDLE_TIMEOUT = 10000L;@b@    private ProviderInfo providerList;@b@@b@    private SelectorPool()@b@    {@b@      this.providerList = null;@b@    }@b@@b@    int select(SelectableChannel channel, int ops, long timeout)@b@      throws IOException@b@    {@b@      int i;@b@      SelectorInfo info = get(channel);@b@@b@      SelectionKey key = null;@b@      int ret = 0;@b@      try@b@      {@b@        long start = (timeout == 0L) ? 0L : System.currentTimeMillis();@b@@b@        key = channel.register(info.selector, ops);@b@        ret = info.selector.select(timeout);@b@@b@        if (ret != 0) {@b@          i = ret;@b@@b@          return i;@b@        }@b@        if (timeout > 0L) {@b@          timeout -= System.currentTimeMillis() - start;@b@          if (timeout <= 0L) {@b@            i = 0;@b@@b@            if (key != null) {@b@              key.cancel();@b@            }@b@@b@            try@b@            {@b@              info.selector.selectNow();@b@            } catch (IOException e) {@b@              SocketIOWithTimeout.LOG.info("Unexpected Exception while clearing selector : " + e.toString());@b@@b@              info.close();@b@              return ret;@b@            }@b@          }@b@        }@b@      }@b@      finally@b@      {@b@        if (key != null) {@b@          key.cancel();@b@        }@b@@b@        try@b@        {@b@          info.selector.selectNow();@b@        } catch (IOException e) {@b@          SocketIOWithTimeout.LOG.info("Unexpected Exception while clearing selector : " + e.toString());@b@@b@          info.close();@b@          return ret;@b@        }@b@@b@        release(info);@b@      }@b@    }@b@@b@    private synchronized SelectorInfo get(SelectableChannel channel)@b@      throws IOException@b@    {@b@      SelectorInfo selInfo = null;@b@@b@      SelectorProvider provider = channel.provider();@b@@b@      ProviderInfo pList = this.providerList;@b@      while ((pList != null) && (pList.provider != provider))@b@        pList = pList.next;@b@@b@      if (pList == null)@b@      {@b@        pList = new ProviderInfo(null);@b@        pList.provider = provider;@b@        pList.queue = new LinkedList();@b@        pList.next = this.providerList;@b@        this.providerList = pList;@b@      }@b@@b@      LinkedList queue = pList.queue;@b@@b@      if (queue.isEmpty()) {@b@        Selector selector = provider.openSelector();@b@        selInfo = new SelectorInfo(null);@b@        selInfo.selector = selector;@b@        selInfo.queue = queue;@b@      } else {@b@        selInfo = (SelectorInfo)queue.removeLast();@b@      }@b@@b@      trimIdleSelectors(System.currentTimeMillis());@b@      return selInfo;@b@    }@b@@b@    private synchronized void release(SelectorInfo info)@b@    {@b@      long now = System.currentTimeMillis();@b@      trimIdleSelectors(now);@b@      info.lastActivityTime = now;@b@      info.queue.addLast(info);@b@    }@b@@b@    private void trimIdleSelectors(long now)@b@    {@b@      long cutoff = now - 10000L;@b@@b@      for (ProviderInfo pList = this.providerList; pList != null; pList = pList.next) {@b@        if (pList.queue.isEmpty())@b@          break label91:@b@@b@        for (Iterator it = pList.queue.iterator(); it.hasNext(); ) {@b@          SelectorInfo info = (SelectorInfo)it.next();@b@          if (info.lastActivityTime > cutoff)@b@            break;@b@@b@          it.remove();@b@          info.close();@b@        }@b@      }@b@      label91:@b@    }@b@@b@    private static class ProviderInfo@b@    {@b@      SelectorProvider provider;@b@      LinkedList<SocketIOWithTimeout.SelectorPool.SelectorInfo> queue;@b@      ProviderInfo next;@b@    }@b@@b@    private static class SelectorInfo@b@    {@b@      Selector selector;@b@      long lastActivityTime;@b@      LinkedList<SelectorInfo> queue;@b@@b@      void close()@b@      {@b@        if (this.selector != null)@b@          try {@b@            this.selector.close();@b@          } catch (IOException e) {@b@            SocketIOWithTimeout.LOG.warn("Unexpected exception while closing selector : " + e.toString());@b@          }@b@      }@b@    }@b@  }@b@}