一、前言
关于flink-core源码包中的org.apache.flink.util.IOUtils输入输出工具类,对二进制流复制copyBytes、文件读写位置的追加skipFully、文件流或Socket关闭等相关操作。
二、源码说明
package org.apache.flink.util;@b@@b@import org.slf4j.Logger;@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.io.OutputStream;@b@import java.io.PrintStream;@b@import java.net.Socket;@b@@b@/**@b@ * An utility class for I/O related functionality.@b@ * @b@ */@b@public final class IOUtils {@b@@b@ /** The block size for byte operations in byte. */@b@ private static final int BLOCKSIZE = 4096;@b@ @b@ // ------------------------------------------------------------------------@b@ // Byte copy operations@b@ // ------------------------------------------------------------------------@b@@b@ /**@b@ * Copies from one stream to another.@b@ * @b@ * @param in@b@ * InputStream to read from@b@ * @param out@b@ * OutputStream to write to@b@ * @param buffSize@b@ * the size of the buffer@b@ * @param close@b@ * whether or not close the InputStream and OutputStream at the end. The streams are closed in the finally@b@ * clause.@b@ * @throws IOException@b@ * thrown if an error occurred while writing to the output stream@b@ */@b@ public static void copyBytes(final InputStream in, final OutputStream out, final int buffSize, final boolean close)@b@ throws IOException {@b@@b@ @SuppressWarnings("resource")@b@ final PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;@b@ final byte[] buf = new byte[buffSize];@b@ try {@b@ int bytesRead = in.read(buf);@b@ while (bytesRead >= 0) {@b@ out.write(buf, 0, bytesRead);@b@ if ((ps != null) && ps.checkError()) {@b@ throw new IOException("Unable to write to output stream.");@b@ }@b@ bytesRead = in.read(buf);@b@ }@b@ } finally {@b@ if (close) {@b@ out.close();@b@ in.close();@b@ }@b@ }@b@ }@b@@b@ /**@b@ * Copies from one stream to another. <strong>closes the input and output@b@ * streams at the end</strong>.@b@ * @b@ * @param in@b@ * InputStream to read from@b@ * @param out@b@ * OutputStream to write to@b@ * @throws IOException@b@ * thrown if an I/O error occurs while copying@b@ */@b@ public static void copyBytes(final InputStream in, final OutputStream out) throws IOException {@b@ copyBytes(in, out, BLOCKSIZE, true);@b@ }@b@@b@ /**@b@ * Copies from one stream to another.@b@ * @b@ * @param in@b@ * InputStream to read from@b@ * @param out@b@ * OutputStream to write to@b@ * @param close@b@ * whether or not close the InputStream and OutputStream at the@b@ * end. The streams are closed in the finally clause.@b@ * @throws IOException@b@ * thrown if an I/O error occurs while copying@b@ */@b@ public static void copyBytes(final InputStream in, final OutputStream out, final boolean close) throws IOException {@b@ copyBytes(in, out, BLOCKSIZE, close);@b@ }@b@@b@ // ------------------------------------------------------------------------@b@ // Stream input skipping@b@ // ------------------------------------------------------------------------@b@ @b@ /**@b@ * Reads len bytes in a loop.@b@ * @b@ * @param in@b@ * The InputStream to read from@b@ * @param buf@b@ * The buffer to fill@b@ * @param off@b@ * offset from the buffer@b@ * @param len@b@ * the length of bytes to read@b@ * @throws IOException@b@ * if it could not read requested number of bytes for any reason (including EOF)@b@ */@b@ public static void readFully(final InputStream in, final byte[] buf, int off, final int len)@b@ throws IOException {@b@ int toRead = len;@b@ while (toRead > 0) {@b@ final int ret = in.read(buf, off, toRead);@b@ if (ret < 0) {@b@ throw new IOException("Premeture EOF from inputStream");@b@ }@b@ toRead -= ret;@b@ off += ret;@b@ }@b@ }@b@@b@ /**@b@ * Similar to readFully(). Skips bytes in a loop.@b@ * @b@ * @param in@b@ * The InputStream to skip bytes from@b@ * @param len@b@ * number of bytes to skip@b@ * @throws IOException@b@ * if it could not skip requested number of bytes for any reason (including EOF)@b@ */@b@ public static void skipFully(final InputStream in, long len) throws IOException {@b@ while (len > 0) {@b@ final long ret = in.skip(len);@b@ if (ret < 0) {@b@ throw new IOException("Premeture EOF from inputStream");@b@ }@b@ len -= ret;@b@ }@b@ }@b@@b@ // ------------------------------------------------------------------------@b@ // Silent I/O cleanup / closing@b@ // ------------------------------------------------------------------------@b@ @b@ /**@b@ * Close the AutoCloseable objects and <b>ignore</b> any {@link Exception} or@b@ * null pointers. Must only be used for cleanup in exception handlers.@b@ * @b@ * @param log@b@ * the log to record problems to at debug level. Can be <code>null</code>.@b@ * @param closeables@b@ * the objects to close@b@ */@b@ public static void cleanup(final Logger log, final AutoCloseable... closeables) {@b@ for (AutoCloseable c : closeables) {@b@ if (c != null) {@b@ try {@b@ c.close();@b@ } catch (Exception e) {@b@ if (log != null && log.isDebugEnabled()) {@b@ log.debug("Exception in closing " + c, e);@b@ }@b@ }@b@ }@b@ }@b@ }@b@@b@ /**@b@ * Closes the stream ignoring {@link IOException}. Must only be called in@b@ * cleaning up from exception handlers.@b@ * @b@ * @param stream@b@ * the stream to close@b@ */@b@ public static void closeStream(final java.io.Closeable stream) {@b@ cleanup(null, stream);@b@ }@b@@b@ /**@b@ * Closes the socket ignoring {@link IOException}.@b@ * @b@ * @param sock@b@ * the socket to close@b@ */@b@ public static void closeSocket(final Socket sock) {@b@ // avoids try { close() } dance@b@ if (sock != null) {@b@ try {@b@ sock.close();@b@ } catch (IOException ignored) {@b@ }@b@ }@b@ }@b@@b@ /**@b@ * Closes all {@link AutoCloseable} objects in the parameter, suppressing exceptions. Exception will be emitted@b@ * after calling close() on every object.@b@ *@b@ * @param closeables iterable with closeables to close.@b@ * @throws Exception collected exceptions that occurred during closing@b@ */@b@ public static void closeAll(Iterable<? extends AutoCloseable> closeables) throws Exception {@b@ if (null != closeables) {@b@@b@ Exception collectedExceptions = null;@b@@b@ for (AutoCloseable closeable : closeables) {@b@ try {@b@ if (null != closeable) {@b@ closeable.close();@b@ }@b@ } catch (Exception e) {@b@ collectedExceptions = ExceptionUtils.firstOrSuppressed(collectedExceptions, e);@b@ }@b@ }@b@@b@ if (null != collectedExceptions) {@b@ throw collectedExceptions;@b@ }@b@ }@b@ }@b@@b@ /**@b@ * Closes all elements in the iterable with closeQuietly().@b@ */@b@ public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) {@b@ if (null != closeables) {@b@ for (AutoCloseable closeable : closeables) {@b@ closeQuietly(closeable);@b@ }@b@ }@b@ }@b@@b@ /**@b@ * <p><b>Important:</b> This method is expected to never throw an exception.@b@ */@b@ public static void closeQuietly(AutoCloseable closeable) {@b@ try {@b@ if (closeable != null) {@b@ closeable.close();@b@ }@b@ } catch (Throwable ignored) {}@b@ }@b@ @b@ // ------------------------------------------------------------------------@b@@b@ /**@b@ * Private constructor to prevent instantiation.@b@ */@b@ private IOUtils() {}@b@}