一、前言
关于flink源码包中org.apache.flink.runtime.net.NetUtils网络工具类,获取网络地址InetSocketAddress、Socket输入InputStream输出OutputStream流、基于SocketChannel创建Socket链接等。
二、源码说明
1.NetUtils工具类
package org.apache.flink.runtime.net;@b@@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.io.OutputStream;@b@import java.net.InetAddress;@b@import java.net.InetSocketAddress;@b@import java.net.Socket;@b@import java.net.SocketAddress;@b@import java.net.URI;@b@import java.net.URISyntaxException;@b@import java.net.UnknownHostException;@b@import java.nio.channels.SocketChannel;@b@import java.util.ArrayList;@b@import java.util.Collection;@b@import java.util.HashMap;@b@import java.util.List;@b@import java.util.Map;@b@import java.util.Map.Entry;@b@import java.util.Set;@b@import javax.net.SocketFactory;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.runtime.ipc.Server;@b@@b@public class NetUtils@b@{@b@ private static final Log LOG = LogFactory.getLog(NetUtils.class);@b@ private static Map<String, String> hostToResolved = new HashMap();@b@@b@ public static SocketFactory getSocketFactory()@b@ {@b@ return getDefaultSocketFactory();@b@ }@b@@b@ public static SocketFactory getDefaultSocketFactory()@b@ {@b@ return SocketFactory.getDefault();@b@ }@b@@b@ public static InetSocketAddress createSocketAddr(String target)@b@ {@b@ return createSocketAddr(target, -1);@b@ }@b@@b@ public static InetSocketAddress createSocketAddr(String target, int defaultPort)@b@ {@b@ int colonIndex = target.indexOf(58);@b@ if ((colonIndex < 0) && (defaultPort == -1))@b@ throw new RuntimeException("Not a host:port pair: " + target);@b@@b@ String hostname = "";@b@ int port = -1;@b@ if (!(target.contains("/")))@b@ if (colonIndex == -1) {@b@ hostname = target;@b@ }@b@ else {@b@ hostname = target.substring(0, colonIndex);@b@ port = Integer.parseInt(target.substring(colonIndex + 1));@b@ }@b@ else@b@ try@b@ {@b@ URI addr = new URI(target);@b@ hostname = addr.getHost();@b@ port = addr.getPort();@b@ } catch (URISyntaxException use) {@b@ LOG.fatal(use);@b@ }@b@@b@@b@ if (port == -1) {@b@ port = defaultPort;@b@ }@b@@b@ if (getStaticResolution(hostname) != null)@b@ hostname = getStaticResolution(hostname);@b@@b@ return new InetSocketAddress(hostname, port);@b@ }@b@@b@ public static void addStaticResolution(String host, String resolvedName)@b@ {@b@ synchronized (hostToResolved) {@b@ hostToResolved.put(host, resolvedName);@b@ }@b@ }@b@@b@ public static String getStaticResolution(String host)@b@ {@b@ synchronized (hostToResolved) {@b@ return ((String)hostToResolved.get(host));@b@ }@b@ }@b@@b@ public static List<String[]> getAllStaticResolutions()@b@ {@b@ synchronized (hostToResolved) {@b@ Set entries = hostToResolved.entrySet();@b@ if (entries.size() != 0) break label28;@b@ return null;@b@@b@ label28: List l = new ArrayList(entries.size());@b@ for (Map.Entry e : entries)@b@ l.add(new String[] { (String)e.getKey(), (String)e.getValue() });@b@@b@ return l;@b@ }@b@ }@b@@b@ public static InetSocketAddress getConnectAddress(Server server)@b@ {@b@ InetSocketAddress addr = server.getListenerAddress();@b@ if (addr.getAddress().getHostAddress().equals("0.0.0.0"))@b@ addr = new InetSocketAddress("127.0.0.1", addr.getPort());@b@@b@ return addr;@b@ }@b@@b@ public static InputStream getInputStream(Socket socket)@b@ throws IOException@b@ {@b@ return getInputStream(socket, socket.getSoTimeout());@b@ }@b@@b@ public static InputStream getInputStream(Socket socket, long timeout)@b@ throws IOException@b@ {@b@ return new SocketInputStream(socket, timeout);@b@ }@b@@b@ public static OutputStream getOutputStream(Socket socket)@b@ throws IOException@b@ {@b@ return getOutputStream(socket, 0L);@b@ }@b@@b@ public static OutputStream getOutputStream(Socket socket, long timeout)@b@ throws IOException@b@ {@b@ return new SocketOutputStream(socket, timeout);@b@ }@b@@b@ public static void connect(Socket socket, SocketAddress endpoint, int timeout) throws IOException {@b@ if ((socket == null) || (endpoint == null) || (timeout < 0)) {@b@ throw new IllegalArgumentException("Illegal argument for connect()");@b@ }@b@@b@ SocketChannel ch = socket.getChannel();@b@@b@ if (ch == null)@b@ {@b@ socket.connect(endpoint, timeout);@b@ }@b@ else SocketIOWithTimeout.connect(ch, endpoint, timeout);@b@ }@b@@b@ public static String normalizeHostName(String name)@b@ {@b@ if (Character.digit(name.charAt(0), 16) != -1)@b@ return name;@b@ try@b@ {@b@ InetAddress ipAddress = InetAddress.getByName(name);@b@ return ipAddress.getHostAddress(); } catch (UnknownHostException e) {@b@ }@b@ return name;@b@ }@b@@b@ public static List<String> normalizeHostNames(Collection<String> names)@b@ {@b@ List hostNames = new ArrayList(names.size());@b@ for (String name : names)@b@ hostNames.add(normalizeHostName(name));@b@@b@ return hostNames;@b@ }@b@}
2.SocketInputStream、SocketOutputStream、SocketIOWithTimeout依赖类
package org.apache.flink.runtime.net;@b@@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.net.Socket;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.ReadableByteChannel;@b@import java.nio.channels.SelectableChannel;@b@@b@public class SocketInputStream extends InputStream@b@ implements ReadableByteChannel@b@{@b@ private Reader reader;@b@@b@ public SocketInputStream(ReadableByteChannel channel, long timeout)@b@ throws IOException@b@ {@b@ SocketIOWithTimeout.checkChannelValidity(channel);@b@ this.reader = new Reader(channel, timeout);@b@ }@b@@b@ public SocketInputStream(Socket socket, long timeout)@b@ throws IOException@b@ {@b@ this(socket.getChannel(), timeout);@b@ }@b@@b@ public SocketInputStream(Socket socket)@b@ throws IOException@b@ {@b@ this(socket.getChannel(), socket.getSoTimeout());@b@ }@b@@b@ public int read()@b@ throws IOException@b@ {@b@ byte[] buf = new byte[1];@b@ int ret = read(buf, 0, 1);@b@ if (ret > 0)@b@ return buf[0];@b@@b@ if (ret != -1)@b@ {@b@ throw new IOException("Could not read from stream");@b@ }@b@ return ret;@b@ }@b@@b@ public int read(byte[] b, int off, int len) throws IOException {@b@ return read(ByteBuffer.wrap(b, off, len));@b@ }@b@@b@ public synchronized void close()@b@ throws IOException@b@ {@b@ this.reader.channel.close();@b@ this.reader.close();@b@ }@b@@b@ public ReadableByteChannel getChannel()@b@ {@b@ return this.reader.channel;@b@ }@b@@b@ public boolean isOpen()@b@ {@b@ return this.reader.isOpen();@b@ }@b@@b@ public int read(ByteBuffer dst) throws IOException {@b@ return this.reader.doIO(dst, 1);@b@ }@b@@b@ public void waitForReadable()@b@ throws IOException@b@ {@b@ this.reader.waitForIO(1);@b@ }@b@@b@ private static class Reader extends SocketIOWithTimeout@b@ {@b@ ReadableByteChannel channel;@b@@b@ Reader(ReadableByteChannel channel, long timeout)@b@ throws IOException@b@ {@b@ super((SelectableChannel)channel, timeout);@b@ this.channel = channel;@b@ }@b@@b@ int performIO(ByteBuffer buf) throws IOException {@b@ return this.channel.read(buf);@b@ }@b@ }@b@}
package org.apache.flink.runtime.net;@b@@b@import java.io.EOFException;@b@import java.io.IOException;@b@import java.io.OutputStream;@b@import java.net.Socket;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.FileChannel;@b@import java.nio.channels.SelectableChannel;@b@import java.nio.channels.WritableByteChannel;@b@@b@public class SocketOutputStream extends OutputStream@b@ implements WritableByteChannel@b@{@b@ private Writer writer;@b@@b@ public SocketOutputStream(WritableByteChannel channel, long timeout)@b@ throws IOException@b@ {@b@ SocketIOWithTimeout.checkChannelValidity(channel);@b@ this.writer = new Writer(channel, timeout);@b@ }@b@@b@ public SocketOutputStream(Socket socket, long timeout)@b@ throws IOException@b@ {@b@ this(socket.getChannel(), timeout);@b@ }@b@@b@ public void write(int b)@b@ throws IOException@b@ {@b@ byte[] buf = new byte[1];@b@ buf[0] = (byte)b;@b@ write(buf, 0, 1);@b@ }@b@@b@ public void write(byte[] b, int off, int len) throws IOException {@b@ ByteBuffer buf = ByteBuffer.wrap(b, off, len);@b@ if (buf.hasRemaining())@b@ try {@b@ if (write(buf) < 0) {@b@ throw new IOException("The stream is closed");@b@ }@b@@b@ }@b@ catch (IOException e)@b@ {@b@ if (buf.capacity() > buf.remaining())@b@ this.writer.close();@b@@b@ throw e;@b@ }@b@ }@b@@b@ public synchronized void close()@b@ throws IOException@b@ {@b@ this.writer.channel.close();@b@ this.writer.close();@b@ }@b@@b@ public WritableByteChannel getChannel()@b@ {@b@ return this.writer.channel;@b@ }@b@@b@ public boolean isOpen()@b@ {@b@ return this.writer.isOpen();@b@ }@b@@b@ public int write(ByteBuffer src) throws IOException {@b@ return this.writer.doIO(src, 4);@b@ }@b@@b@ public void waitForWritable()@b@ throws IOException@b@ {@b@ this.writer.waitForIO(4);@b@ }@b@@b@ public void transferToFully(FileChannel fileCh, long position, int count)@b@ throws IOException@b@ {@b@ while (count > 0)@b@ {@b@ waitForWritable();@b@ int nTransfered = (int)fileCh.transferTo(position, count, getChannel());@b@@b@ if (nTransfered == 0)@b@ {@b@ if (position < fileCh.size()) continue;@b@ throw new EOFException("EOF Reached. file size is " + fileCh.size() + " and " + count + " more bytes left to be " + "transfered.");@b@ }@b@@b@ if (nTransfered < 0)@b@ throw new IOException("Unexpected return of " + nTransfered + " from transferTo()");@b@@b@ position += nTransfered;@b@ count -= nTransfered;@b@ }@b@ }@b@@b@ private static class Writer extends SocketIOWithTimeout@b@ {@b@ WritableByteChannel channel;@b@@b@ Writer(WritableByteChannel channel, long timeout)@b@ throws IOException@b@ {@b@ super((SelectableChannel)channel, timeout);@b@ this.channel = channel;@b@ }@b@@b@ int performIO(ByteBuffer buf) throws IOException {@b@ return this.channel.write(buf);@b@ }@b@ }@b@}
package org.apache.flink.runtime.net;@b@@b@import java.io.IOException;@b@import java.net.SocketAddress;@b@import java.net.SocketTimeoutException;@b@import java.nio.ByteBuffer;@b@import java.nio.channels.SelectableChannel;@b@import java.nio.channels.SelectionKey;@b@import java.nio.channels.Selector;@b@import java.nio.channels.SocketChannel;@b@import java.nio.channels.spi.SelectorProvider;@b@import java.util.Iterator;@b@import java.util.LinkedList;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@@b@abstract class SocketIOWithTimeout@b@{@b@ static final Log LOG = LogFactory.getLog(SocketIOWithTimeout.class);@b@ private SelectableChannel channel;@b@ private long timeout;@b@ private boolean closed = false;@b@ private static SelectorPool selector = new SelectorPool(null);@b@@b@ SocketIOWithTimeout(SelectableChannel channel, long timeout)@b@ throws IOException@b@ {@b@ checkChannelValidity(channel);@b@@b@ this.channel = channel;@b@ this.timeout = timeout;@b@@b@ channel.configureBlocking(false);@b@ }@b@@b@ void close() {@b@ this.closed = true;@b@ }@b@@b@ boolean isOpen() {@b@ return ((!(this.closed)) && (this.channel.isOpen()));@b@ }@b@@b@ SelectableChannel getChannel() {@b@ return this.channel;@b@ }@b@@b@ static void checkChannelValidity(Object channel)@b@ throws IOException@b@ {@b@ if (channel == null)@b@ {@b@ throw new IOException("Channel is null. Check how the channel or socket is created.");@b@ }@b@@b@ if (!(channel instanceof SelectableChannel))@b@ throw new IOException("Channel should be a SelectableChannel");@b@ }@b@@b@ abstract int performIO(ByteBuffer paramByteBuffer)@b@ throws IOException;@b@@b@ int doIO(ByteBuffer buf, int ops)@b@ throws IOException@b@ {@b@ if (!(buf.hasRemaining())) {@b@ throw new IllegalArgumentException("Buffer has no data left.");@b@ }@b@@b@ while (buf.hasRemaining()) {@b@ if (this.closed)@b@ return -1;@b@@b@ try@b@ {@b@ int n = performIO(buf);@b@ if (n != 0)@b@ {@b@ return n;@b@ }@b@ } catch (IOException e) {@b@ if (!(this.channel.isOpen()))@b@ this.closed = true;@b@@b@ throw e;@b@ }@b@@b@ int count = 0;@b@ try {@b@ count = selector.select(this.channel, ops, this.timeout);@b@ } catch (IOException e) {@b@ this.closed = true;@b@ throw e;@b@ }@b@@b@ if (count == 0) {@b@ throw new SocketTimeoutException(timeoutExceptionString(this.channel, this.timeout, ops));@b@ }@b@@b@ }@b@@b@ return 0;@b@ }@b@@b@ static void connect(SocketChannel channel, SocketAddress endpoint, int timeout)@b@ throws IOException@b@ {@b@ long timeoutLeft;@b@ boolean blockingOn = channel.isBlocking();@b@ if (blockingOn)@b@ channel.configureBlocking(false);@b@@b@ try@b@ {@b@ if (channel.connect(endpoint))@b@ {@b@ return;@b@ }@b@ timeoutLeft = timeout;@b@ long endTime = (timeout > 0) ? System.currentTimeMillis() + timeout : 0L;@b@@b@ int ret = selector.select(channel, 8, timeoutLeft);@b@@b@ if ((ret > 0) && (channel.finishConnect()))@b@ {@b@ return;@b@ }@b@ label143: if (ret != 0) { if (timeout <= 0) {@b@ break label143;@b@ }@b@@b@ }@b@@b@ }@b@ catch (IOException e)@b@ {@b@ }@b@ finally@b@ {@b@ if ((blockingOn) && (channel.isOpen()))@b@ channel.configureBlocking(true);@b@ }@b@ }@b@@b@ void waitForIO(int ops)@b@ throws IOException@b@ {@b@ if (selector.select(this.channel, ops, this.timeout) == 0)@b@ throw new SocketTimeoutException(timeoutExceptionString(this.channel, this.timeout, ops));@b@ }@b@@b@ private static String timeoutExceptionString(SelectableChannel channel, long timeout, int ops)@b@ {@b@ String waitingFor;@b@ switch (ops)@b@ {@b@ case 1:@b@ waitingFor = "read";@b@ break;@b@ case 4:@b@ waitingFor = "write";@b@ break;@b@ case 8:@b@ waitingFor = "connect";@b@ break;@b@ default:@b@ waitingFor = "" + ops;@b@ }@b@@b@ return timeout + " millis timeout while " + "waiting for channel to be ready for " + waitingFor + ". ch : " + channel;@b@ }@b@@b@ private static class SelectorPool@b@ {@b@ private static final long IDLE_TIMEOUT = 10000L;@b@ private ProviderInfo providerList;@b@@b@ private SelectorPool()@b@ {@b@ this.providerList = null;@b@ }@b@@b@ int select(SelectableChannel channel, int ops, long timeout)@b@ throws IOException@b@ {@b@ int i;@b@ SelectorInfo info = get(channel);@b@@b@ SelectionKey key = null;@b@ int ret = 0;@b@ try@b@ {@b@ long start = (timeout == 0L) ? 0L : System.currentTimeMillis();@b@@b@ key = channel.register(info.selector, ops);@b@ ret = info.selector.select(timeout);@b@@b@ if (ret != 0) {@b@ i = ret;@b@@b@ return i;@b@ }@b@ if (timeout > 0L) {@b@ timeout -= System.currentTimeMillis() - start;@b@ if (timeout <= 0L) {@b@ i = 0;@b@@b@ if (key != null) {@b@ key.cancel();@b@ }@b@@b@ try@b@ {@b@ info.selector.selectNow();@b@ } catch (IOException e) {@b@ SocketIOWithTimeout.LOG.info("Unexpected Exception while clearing selector : " + e.toString());@b@@b@ info.close();@b@ return ret;@b@ }@b@ }@b@ }@b@ }@b@ finally@b@ {@b@ if (key != null) {@b@ key.cancel();@b@ }@b@@b@ try@b@ {@b@ info.selector.selectNow();@b@ } catch (IOException e) {@b@ SocketIOWithTimeout.LOG.info("Unexpected Exception while clearing selector : " + e.toString());@b@@b@ info.close();@b@ return ret;@b@ }@b@@b@ release(info);@b@ }@b@ }@b@@b@ private synchronized SelectorInfo get(SelectableChannel channel)@b@ throws IOException@b@ {@b@ SelectorInfo selInfo = null;@b@@b@ SelectorProvider provider = channel.provider();@b@@b@ ProviderInfo pList = this.providerList;@b@ while ((pList != null) && (pList.provider != provider))@b@ pList = pList.next;@b@@b@ if (pList == null)@b@ {@b@ pList = new ProviderInfo(null);@b@ pList.provider = provider;@b@ pList.queue = new LinkedList();@b@ pList.next = this.providerList;@b@ this.providerList = pList;@b@ }@b@@b@ LinkedList queue = pList.queue;@b@@b@ if (queue.isEmpty()) {@b@ Selector selector = provider.openSelector();@b@ selInfo = new SelectorInfo(null);@b@ selInfo.selector = selector;@b@ selInfo.queue = queue;@b@ } else {@b@ selInfo = (SelectorInfo)queue.removeLast();@b@ }@b@@b@ trimIdleSelectors(System.currentTimeMillis());@b@ return selInfo;@b@ }@b@@b@ private synchronized void release(SelectorInfo info)@b@ {@b@ long now = System.currentTimeMillis();@b@ trimIdleSelectors(now);@b@ info.lastActivityTime = now;@b@ info.queue.addLast(info);@b@ }@b@@b@ private void trimIdleSelectors(long now)@b@ {@b@ long cutoff = now - 10000L;@b@@b@ for (ProviderInfo pList = this.providerList; pList != null; pList = pList.next) {@b@ if (pList.queue.isEmpty())@b@ break label91:@b@@b@ for (Iterator it = pList.queue.iterator(); it.hasNext(); ) {@b@ SelectorInfo info = (SelectorInfo)it.next();@b@ if (info.lastActivityTime > cutoff)@b@ break;@b@@b@ it.remove();@b@ info.close();@b@ }@b@ }@b@ label91:@b@ }@b@@b@ private static class ProviderInfo@b@ {@b@ SelectorProvider provider;@b@ LinkedList<SocketIOWithTimeout.SelectorPool.SelectorInfo> queue;@b@ ProviderInfo next;@b@ }@b@@b@ private static class SelectorInfo@b@ {@b@ Selector selector;@b@ long lastActivityTime;@b@ LinkedList<SelectorInfo> queue;@b@@b@ void close()@b@ {@b@ if (this.selector != null)@b@ try {@b@ this.selector.close();@b@ } catch (IOException e) {@b@ SocketIOWithTimeout.LOG.warn("Unexpected exception while closing selector : " + e.toString());@b@ }@b@ }@b@ }@b@ }@b@}