一、前言
关于flink-core源码包中的org.apache.flink.util.NetUtils网络工具类,对涉及网络组件用的域名URL的抽取、端口验证getAvailablePort等。
二、源码说明
package org.apache.flink.util;@b@@b@import org.apache.flink.annotation.Internal;@b@import org.apache.flink.configuration.IllegalConfigurationException;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@import sun.net.util.IPAddressUtil;@b@import java.io.IOException;@b@import java.net.Inet4Address;@b@import java.net.Inet6Address;@b@import java.net.InetAddress;@b@import java.net.InetSocketAddress;@b@import java.net.MalformedURLException;@b@import java.net.ServerSocket;@b@import java.net.URL;@b@import java.net.UnknownHostException;@b@import java.util.Arrays;@b@import java.util.Collections;@b@import java.util.Iterator;@b@@b@@Internal@b@public class NetUtils {@b@@b@ private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);@b@@b@ /** The wildcard address to listen on all interfaces (either 0.0.0.0 or ::) */@b@ private static final String WILDCARD_ADDRESS = new InetSocketAddress(0).getAddress().getHostAddress();@b@ @b@ /**@b@ * Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts@b@ * (separated by a period '.'), it will take the first part. Otherwise it takes the entire fqdn.@b@ * @b@ * @param fqdn The fully qualified domain name.@b@ * @return The hostname.@b@ */@b@ public static String getHostnameFromFQDN(String fqdn) {@b@ if (fqdn == null) {@b@ throw new IllegalArgumentException("fqdn is null");@b@ }@b@ int dotPos = fqdn.indexOf('.');@b@ if(dotPos == -1) {@b@ return fqdn;@b@ } else {@b@ return fqdn.substring(0, dotPos);@b@ }@b@ }@b@@b@ /**@b@ * Method to validate if the given String represents a hostname:port.@b@ *@b@ * Works also for ipv6.@b@ *@b@ * See: http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress@b@ *@b@ * @return URL object for accessing host and Port@b@ */@b@ public static URL getCorrectHostnamePort(String hostPort) {@b@ try {@b@ URL u = new URL("http://"+hostPort);@b@ if(u.getHost() == null) {@b@ throw new IllegalArgumentException("The given host:port ('"+hostPort+"') doesn't contain a valid host");@b@ }@b@ if(u.getPort() == -1) {@b@ throw new IllegalArgumentException("The given host:port ('"+hostPort+"') doesn't contain a valid port");@b@ }@b@ return u;@b@ } catch (MalformedURLException e) {@b@ throw new IllegalArgumentException("The given host:port ('"+hostPort+"') is invalid", e);@b@ }@b@ }@b@@b@ // ------------------------------------------------------------------------@b@ // Lookup of to free ports@b@ // ------------------------------------------------------------------------@b@ @b@ /**@b@ * Find a non-occupied port.@b@ *@b@ * @return A non-occupied port.@b@ */@b@ public static int getAvailablePort() {@b@ for (int i = 0; i < 50; i++) {@b@ try (ServerSocket serverSocket = new ServerSocket(0)) {@b@ int port = serverSocket.getLocalPort();@b@ if (port != 0) {@b@ return port;@b@ }@b@ }@b@ catch (IOException ignored) {}@b@ }@b@@b@ throw new RuntimeException("Could not find a free permitted port on the machine.");@b@ }@b@ @b@@b@ // ------------------------------------------------------------------------@b@ // Encoding of IP addresses for URLs@b@ // ------------------------------------------------------------------------@b@@b@ /**@b@ * Returns an address in a normalized format for Akka.@b@ * When an IPv6 address is specified, it normalizes the IPv6 address to avoid@b@ * complications with the exact URL match policy of Akka.@b@ * @param host The hostname, IPv4 or IPv6 address@b@ * @return host which will be normalized if it is an IPv6 address@b@ */@b@ public static String unresolvedHostToNormalizedString(String host) {@b@ // Return loopback interface address if host is null@b@ // This represents the behavior of {@code InetAddress.getByName } and RFC 3330@b@ if (host == null) {@b@ host = InetAddress.getLoopbackAddress().getHostAddress();@b@ } else {@b@ host = host.trim().toLowerCase();@b@ }@b@@b@ // normalize and valid address@b@ if (IPAddressUtil.isIPv6LiteralAddress(host)) {@b@ byte[] ipV6Address = IPAddressUtil.textToNumericFormatV6(host);@b@ host = getIPv6UrlRepresentation(ipV6Address);@b@ } else if (!IPAddressUtil.isIPv4LiteralAddress(host)) {@b@ try {@b@ // We don't allow these in hostnames@b@ Preconditions.checkArgument(!host.startsWith("."));@b@ Preconditions.checkArgument(!host.endsWith("."));@b@ Preconditions.checkArgument(!host.contains(":"));@b@ } catch (Exception e) {@b@ throw new IllegalConfigurationException("The configured hostname is not valid", e);@b@ }@b@ }@b@@b@ return host;@b@ }@b@@b@ /**@b@ * Returns a valid address for Akka. It returns a String of format 'host:port'.@b@ * When an IPv6 address is specified, it normalizes the IPv6 address to avoid@b@ * complications with the exact URL match policy of Akka.@b@ * @param host The hostname, IPv4 or IPv6 address@b@ * @param port The port@b@ * @return host:port where host will be normalized if it is an IPv6 address@b@ */@b@ public static String unresolvedHostAndPortToNormalizedString(String host, int port) {@b@ Preconditions.checkArgument(port >= 0 && port < 65536,@b@ "Port is not within the valid range,");@b@ return unresolvedHostToNormalizedString(host) + ":" + port;@b@ }@b@@b@ /**@b@ * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses@b@ * have the proper formatting to be included in URLs.@b@ * @b@ * @param address The IP address to encode.@b@ * @return The proper URL string encoded IP address.@b@ */@b@ public static String ipAddressToUrlString(InetAddress address) {@b@ if (address == null) {@b@ throw new NullPointerException("address is null");@b@ }@b@ else if (address instanceof Inet4Address) {@b@ return address.getHostAddress();@b@ }@b@ else if (address instanceof Inet6Address) {@b@ return getIPv6UrlRepresentation((Inet6Address) address);@b@ }@b@ else {@b@ throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);@b@ }@b@ }@b@@b@ /**@b@ * Encodes an IP address and port to be included in URL. in particular, this method makes@b@ * sure that IPv6 addresses have the proper formatting to be included in URLs.@b@ *@b@ * @param address The address to be included in the URL.@b@ * @param port The port for the URL address.@b@ * @return The proper URL string encoded IP address and port.@b@ */@b@ public static String ipAddressAndPortToUrlString(InetAddress address, int port) {@b@ return ipAddressToUrlString(address) + ':' + port;@b@ }@b@@b@ /**@b@ * Encodes an IP address and port to be included in URL. in particular, this method makes@b@ * sure that IPv6 addresses have the proper formatting to be included in URLs.@b@ * @b@ * @param address The socket address with the IP address and port.@b@ * @return The proper URL string encoded IP address and port.@b@ */@b@ public static String socketAddressToUrlString(InetSocketAddress address) {@b@ if (address.isUnresolved()) {@b@ throw new IllegalArgumentException("Address cannot be resolved: " + address.getHostString());@b@ }@b@ return ipAddressAndPortToUrlString(address.getAddress(), address.getPort());@b@ }@b@@b@ /**@b@ * Normalizes and encodes a hostname and port to be included in URL. @b@ * In particular, this method makes sure that IPv6 address literals have the proper@b@ * formatting to be included in URLs.@b@ *@b@ * @param host The address to be included in the URL.@b@ * @param port The port for the URL address.@b@ * @return The proper URL string encoded IP address and port.@b@ * @throws java.net.UnknownHostException Thrown, if the hostname cannot be translated into a URL.@b@ */@b@ public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException {@b@ return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);@b@ }@b@@b@ /**@b@ * Creates a compressed URL style representation of an Inet6Address.@b@ *@b@ * <p>This method copies and adopts code from Google's Guava library.@b@ * We re-implement this here in order to reduce dependency on Guava.@b@ * The Guava library has frequently caused dependency conflicts in the past.@b@ */@b@ private static String getIPv6UrlRepresentation(Inet6Address address) {@b@ return getIPv6UrlRepresentation(address.getAddress());@b@ }@b@@b@ /**@b@ * Creates a compressed URL style representation of an Inet6Address.@b@ *@b@ * <p>This method copies and adopts code from Google's Guava library.@b@ * We re-implement this here in order to reduce dependency on Guava.@b@ * The Guava library has frequently caused dependency conflicts in the past.@b@ */@b@ private static String getIPv6UrlRepresentation(byte[] addressBytes) {@b@ // first, convert bytes to 16 bit chunks@b@ int[] hextets = new int[8];@b@ for (int i = 0; i < hextets.length; i++) {@b@ hextets[i] = (addressBytes[2 * i] & 0xFF) << 8 | (addressBytes[2 * i + 1] & 0xFF);@b@ }@b@@b@ // now, find the sequence of zeros that should be compressed@b@ int bestRunStart = -1;@b@ int bestRunLength = -1;@b@ int runStart = -1;@b@ for (int i = 0; i < hextets.length + 1; i++) {@b@ if (i < hextets.length && hextets[i] == 0) {@b@ if (runStart < 0) {@b@ runStart = i;@b@ }@b@ } else if (runStart >= 0) {@b@ int runLength = i - runStart;@b@ if (runLength > bestRunLength) {@b@ bestRunStart = runStart;@b@ bestRunLength = runLength;@b@ }@b@ runStart = -1;@b@ }@b@ }@b@ if (bestRunLength >= 2) {@b@ Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);@b@ }@b@@b@ // convert into text form@b@ StringBuilder buf = new StringBuilder(40);@b@ buf.append('[');@b@ @b@ boolean lastWasNumber = false;@b@ for (int i = 0; i < hextets.length; i++) {@b@ boolean thisIsNumber = hextets[i] >= 0;@b@ if (thisIsNumber) {@b@ if (lastWasNumber) {@b@ buf.append(':');@b@ }@b@ buf.append(Integer.toHexString(hextets[i]));@b@ } else {@b@ if (i == 0 || lastWasNumber) {@b@ buf.append("::");@b@ }@b@ }@b@ lastWasNumber = thisIsNumber;@b@ }@b@ buf.append(']');@b@ return buf.toString();@b@ }@b@ @b@ // ------------------------------------------------------------------------@b@ // Port range parsing@b@ // ------------------------------------------------------------------------@b@ @b@ /**@b@ * Returns an iterator over available ports defined by the range definition.@b@ *@b@ * @param rangeDefinition String describing a single port, a range of ports or multiple ranges.@b@ * @return Set of ports from the range definition@b@ * @throws NumberFormatException If an invalid string is passed.@b@ */@b@ public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException {@b@ final String[] ranges = rangeDefinition.trim().split(",");@b@ @b@ UnionIterator<Integer> iterators = new UnionIterator<>();@b@ @b@ for (String rawRange: ranges) {@b@ Iterator<Integer> rangeIterator;@b@ String range = rawRange.trim();@b@ int dashIdx = range.indexOf('-');@b@ if (dashIdx == -1) {@b@ // only one port in range:@b@ final int port = Integer.valueOf(range);@b@ if (port < 0 || port > 65535) {@b@ throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +@b@ "and 65535, but was " + port + ".");@b@ }@b@ rangeIterator = Collections.singleton(Integer.valueOf(range)).iterator();@b@ } else {@b@ // evaluate range@b@ final int start = Integer.valueOf(range.substring(0, dashIdx));@b@ if (start < 0 || start > 65535) {@b@ throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +@b@ "and 65535, but was " + start + ".");@b@ }@b@ final int end = Integer.valueOf(range.substring(dashIdx+1, range.length()));@b@ if (end < 0 || end > 65535) {@b@ throw new IllegalConfigurationException("Invalid port configuration. Port must be between 0" +@b@ "and 65535, but was " + end + ".");@b@ }@b@ rangeIterator = new Iterator<Integer>() {@b@ int i = start;@b@ @Override@b@ public boolean hasNext() {@b@ return i <= end;@b@ }@b@@b@ @Override@b@ public Integer next() {@b@ return i++;@b@ }@b@@b@ @Override@b@ public void remove() {@b@ throw new UnsupportedOperationException("Remove not supported");@b@ }@b@ };@b@ }@b@ iterators.add(rangeIterator);@b@ }@b@ @b@ return iterators;@b@ }@b@@b@ /**@b@ * Tries to allocate a socket from the given sets of ports.@b@ *@b@ * @param portsIterator A set of ports to choose from.@b@ * @param factory A factory for creating the SocketServer@b@ * @return null if no port was available or an allocated socket.@b@ */@b@ public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator, SocketFactory factory) {@b@ while (portsIterator.hasNext()) {@b@ int port = portsIterator.next();@b@ LOG.debug("Trying to open socket on port {}", port);@b@ try {@b@ return factory.createSocket(port);@b@ } catch (IOException | IllegalArgumentException e) {@b@ if (LOG.isDebugEnabled()) {@b@ LOG.debug("Unable to allocate socket on port", e);@b@ } else {@b@ LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage());@b@ }@b@ }@b@ }@b@ return null;@b@ }@b@@b@ /**@b@ * Returns the wildcard address to listen on all interfaces.@b@ * @return Either 0.0.0.0 or :: depending on the IP setup.@b@ */@b@ public static String getWildcardIPAddress() {@b@ return WILDCARD_ADDRESS;@b@ }@b@@b@ public interface SocketFactory {@b@ ServerSocket createSocket(int port) throws IOException;@b@ }@b@}