一、前言
关于flink源码包中的org.apache.flink.runtime.fs.hdfs.DistributedFileSystem、org.apache.flink.runtime.fs.hdfs.S3FileSystem分布式文件系统管理类,分别基于org.apache.flink.core.fs.FileSystem(参见flink-core部分)定义的统一文件系统标准类进行管理,详情参见源码部分。
二、源码说明1.FileSystem接口
package org.apache.flink.core.fs;@b@@b@import java.io.FileNotFoundException;@b@import java.io.IOException;@b@import java.net.URI;@b@import java.net.URISyntaxException;@b@import java.util.HashMap;@b@import java.util.Map;@b@import org.apache.flink.util.ClassUtils;@b@import org.apache.flink.util.StringUtils;@b@@b@public abstract class FileSystem@b@{@b@ private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";@b@ private static final String HADOOP_DISTRIBUTED_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem";@b@ private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";@b@ private static final String S3_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.s3.S3FileSystem";@b@ private static final Object SYNCHRONIZATION_OBJECT = new Object();@b@ private static final Map<FSKey, FileSystem> CACHE = new HashMap();@b@ private static final Map<String, String> FSDIRECTORY = new HashMap();@b@@b@ public static FileSystem getLocalFileSystem()@b@ throws IOException@b@ {@b@ URI localUri;@b@ try@b@ {@b@ localUri = new URI("file:///");@b@ } catch (URISyntaxException e) {@b@ throw new IOException("Cannot create URI for local file system");@b@ }@b@@b@ return get(localUri);@b@ }@b@@b@ public static FileSystem get(URI uri)@b@ throws IOException@b@ {@b@ FileSystem fs = null;@b@@b@ synchronized (SYNCHRONIZATION_OBJECT)@b@ {@b@ if (uri.getScheme() == null)@b@ try {@b@ uri = new URI("file", null, uri.getPath(), null);@b@ }@b@ catch (URISyntaxException e)@b@ {@b@ throw new IOException("FileSystem: Scheme is null. file:// or hdfs:// are example schemes. Failed for " + uri@b@ .toString() + ".");@b@ }@b@@b@@b@ FSKey key = new FSKey(uri.getScheme(), uri.getAuthority());@b@@b@ if (!(CACHE.containsKey(key))) break label113;@b@ return ((FileSystem)CACHE.get(key));@b@@b@ label113: if (FSDIRECTORY.containsKey(uri.getScheme()))@b@ break label175;@b@ throw new IOException("No file system found with scheme " + uri.getScheme() + ". Failed for " + uri@b@ .toString() + ".");@b@@b@ label175: Class fsClass = null;@b@ try {@b@ fsClass = ClassUtils.getFileSystemByName((String)FSDIRECTORY.get(uri.getScheme()));@b@ } catch (ClassNotFoundException e1) {@b@ throw new IOException(StringUtils.stringifyException(e1));@b@ }@b@ try@b@ {@b@ fs = (FileSystem)fsClass.newInstance();@b@ }@b@ catch (InstantiationException e) {@b@ throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);@b@ }@b@ catch (IllegalAccessException e) {@b@ throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);@b@ }@b@@b@ fs.initialize(uri);@b@@b@ CACHE.put(key, fs);@b@ }@b@@b@ return fs;@b@ }@b@@b@ public abstract Path getWorkingDirectory();@b@@b@ public abstract URI getUri();@b@@b@ public abstract void initialize(URI paramURI)@b@ throws IOException;@b@@b@ public abstract FileStatus getFileStatus(Path paramPath)@b@ throws IOException;@b@@b@ public abstract BlockLocation[] getFileBlockLocations(FileStatus paramFileStatus, long paramLong1, long paramLong2)@b@ throws IOException;@b@@b@ public abstract FSDataInputStream open(Path paramPath, int paramInt)@b@ throws IOException;@b@@b@ public abstract FSDataInputStream open(Path paramPath)@b@ throws IOException;@b@@b@ public long getDefaultBlockSize()@b@ {@b@ return 33554432L;@b@ }@b@@b@ public abstract FileStatus[] listStatus(Path paramPath)@b@ throws IOException;@b@@b@ public boolean exists(Path f)@b@ throws IOException@b@ {@b@ try@b@ {@b@ return (getFileStatus(f) != null); } catch (FileNotFoundException e) {@b@ }@b@ return false;@b@ }@b@@b@ public abstract boolean delete(Path paramPath, boolean paramBoolean)@b@ throws IOException;@b@@b@ public abstract boolean mkdirs(Path paramPath)@b@ throws IOException;@b@@b@ public abstract FSDataOutputStream create(Path paramPath, boolean paramBoolean, int paramInt, short paramShort, long paramLong)@b@ throws IOException;@b@@b@ public abstract FSDataOutputStream create(Path paramPath, boolean paramBoolean)@b@ throws IOException;@b@@b@ public abstract boolean rename(Path paramPath1, Path paramPath2)@b@ throws IOException;@b@@b@ public boolean initOutPathLocalFS(Path outPath, WriteMode writeMode, boolean createDirectory)@b@ throws IOException@b@ {@b@ if (isDistributedFS()) {@b@ return false;@b@ }@b@@b@ FileStatus status = null;@b@ try {@b@ status = getFileStatus(outPath);@b@ }@b@ catch (FileNotFoundException e)@b@ {@b@ }@b@@b@ if (status != null)@b@ {@b@ switch (1.$SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode[writeMode.ordinal()])@b@ {@b@ case 1:@b@ if ((status.isDir()) && (createDirectory)) {@b@ return true;@b@ }@b@@b@ throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + WriteMode.NO_OVERWRITE@b@ .name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");@b@ case 2:@b@ if (status.isDir()) {@b@ if (createDirectory)@b@ {@b@ return true;@b@ }@b@ try@b@ {@b@ delete(outPath, true);@b@ }@b@ catch (IOException ioe)@b@ {@b@ }@b@ }@b@ else@b@ {@b@ try@b@ {@b@ delete(outPath, false);@b@ }@b@ catch (IOException ioe)@b@ {@b@ }@b@ }@b@@b@ break;@b@ default:@b@ throw new IllegalArgumentException("Invalid write mode: " + writeMode);@b@ }@b@ }@b@@b@ if (createDirectory)@b@ {@b@ try {@b@ if (!(exists(outPath))) {@b@ mkdirs(outPath);@b@ }@b@@b@ }@b@ catch (IOException ioe)@b@ {@b@ }@b@@b@ try@b@ {@b@ FileStatus check = getFileStatus(outPath);@b@ return check.isDir();@b@ } catch (FileNotFoundException check) {@b@ return false;@b@ }@b@ }@b@@b@ return (!(exists(outPath)));@b@ }@b@@b@ public boolean initOutPathDistFS(Path outPath, WriteMode writeMode, boolean createDirectory)@b@ throws IOException@b@ {@b@ if (!(isDistributedFS())) {@b@ return false;@b@ }@b@@b@ if (exists(outPath))@b@ {@b@ switch (1.$SwitchMap$org$apache$flink$core$fs$FileSystem$WriteMode[writeMode.ordinal()])@b@ {@b@ case 1:@b@ throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + WriteMode.NO_OVERWRITE@b@ .name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories.");@b@ case 2:@b@ try@b@ {@b@ delete(outPath, true);@b@ }@b@ catch (IOException ioe)@b@ {@b@ }@b@@b@ break;@b@ default:@b@ throw new IllegalArgumentException("Invalid write mode: " + writeMode);@b@ }@b@ }@b@@b@ if (createDirectory)@b@ {@b@ try {@b@ if (!(exists(outPath))) {@b@ mkdirs(outPath);@b@ }@b@@b@ }@b@ catch (IOException ioe)@b@ {@b@ }@b@@b@ return ((exists(outPath)) && (getFileStatus(outPath).isDir()));@b@ }@b@@b@ return (!(exists(outPath)));@b@ }@b@@b@ public abstract boolean isDistributedFS();@b@@b@ public int getNumberOfBlocks(FileStatus file)@b@ throws IOException@b@ {@b@ int numberOfBlocks = 0;@b@@b@ if (file == null) {@b@ return 0;@b@ }@b@@b@ if (!(file.isDir())) {@b@ return getNumberOfBlocks(file.getLen(), file.getBlockSize());@b@ }@b@@b@ FileStatus[] files = listStatus(file.getPath());@b@ for (int i = 0; i < files.length; ++i)@b@ {@b@ if (!(files[i].isDir()))@b@ numberOfBlocks += getNumberOfBlocks(files[i].getLen(), files[i].getBlockSize());@b@@b@ }@b@@b@ return numberOfBlocks;@b@ }@b@@b@ private int getNumberOfBlocks(long length, long blocksize)@b@ {@b@ if (blocksize != 0L)@b@ {@b@ int numberOfBlocks = (int)(length / blocksize);@b@@b@ if (length % blocksize != 0L) {@b@ ++numberOfBlocks;@b@ }@b@@b@ return numberOfBlocks;@b@ }@b@ return 1;@b@ }@b@@b@ static@b@ {@b@ FSDIRECTORY.put("hdfs", "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem");@b@ FSDIRECTORY.put("maprfs", "org.apache.flink.runtime.fs.maprfs.MapRFileSystem");@b@ FSDIRECTORY.put("file", "org.apache.flink.core.fs.local.LocalFileSystem");@b@ FSDIRECTORY.put("s3", "org.apache.flink.runtime.fs.s3.S3FileSystem");@b@ }@b@@b@ public static class FSKey@b@ {@b@ private String scheme;@b@ private String authority;@b@@b@ public FSKey(String scheme, String authority)@b@ {@b@ this.scheme = scheme;@b@ this.authority = authority;@b@ }@b@@b@ public boolean equals(Object obj)@b@ {@b@ if (obj instanceof FSKey) {@b@ FSKey key = (FSKey)obj;@b@@b@ if (!(this.scheme.equals(key.scheme))) {@b@ return false;@b@ }@b@@b@ if ((this.authority == null) || (key.authority == null))@b@ {@b@ return ((this.authority == null) && (key.authority == null));@b@ }@b@@b@ return (this.authority.equals(key.authority));@b@ }@b@@b@ return false;@b@ }@b@@b@ public int hashCode()@b@ {@b@ if (this.scheme != null) {@b@ return this.scheme.hashCode();@b@ }@b@@b@ if (this.authority != null) {@b@ return this.authority.hashCode();@b@ }@b@@b@ return super.hashCode();@b@ }@b@ }@b@@b@ public static enum WriteMode@b@ {@b@ NO_OVERWRITE, OVERWRITE;@b@ }@b@}
2.DistributedFileSystem实现类
package org.apache.flink.runtime.fs.hdfs;@b@@b@import java.io.File;@b@import java.io.IOException;@b@import java.lang.reflect.Method;@b@import java.net.URI;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.configuration.GlobalConfiguration;@b@import org.apache.flink.core.fs.FSDataOutputStream;@b@import org.apache.flink.util.InstantiationUtil;@b@import org.apache.hadoop.conf.Configuration;@b@import org.apache.hadoop.fs.FileSystem;@b@@b@public final class DistributedFileSystem extends org.apache.flink.core.fs.FileSystem@b@{@b@ private static final Log LOG = LogFactory.getLog(DistributedFileSystem.class);@b@ private static final String DEFAULT_HDFS_CLASS = "org.apache.hadoop.hdfs.DistributedFileSystem";@b@ private static final String HDFS_IMPLEMENTATION_KEY = "fs.hdfs.impl";@b@ private final Configuration conf;@b@ private final FileSystem fs;@b@@b@ public DistributedFileSystem()@b@ throws IOException@b@ {@b@ this.conf = getHadoopConfiguration();@b@@b@ Class fsClass = null;@b@@b@ LOG.debug("Trying to load HDFS class Hadoop 2.x style.");@b@@b@ Object fsHandle = null;@b@ try {@b@ Method newApi = FileSystem.class.getMethod("getFileSystemClass", new Class[] { String.class, Configuration.class });@b@ fsHandle = newApi.invoke(null, new Object[] { "hdfs", this.conf });@b@ }@b@ catch (Exception e)@b@ {@b@ }@b@@b@ if (fsHandle != null) {@b@ if ((fsHandle instanceof Class) && (FileSystem.class.isAssignableFrom((Class)fsHandle))) {@b@ fsClass = ((Class)fsHandle).asSubclass(FileSystem.class);@b@@b@ if (LOG.isDebugEnabled())@b@ LOG.debug(new StringBuilder().append("Loaded '").append(fsClass.getName()).append("' as HDFS class.").toString());@b@ }@b@ else@b@ {@b@ LOG.debug("Unexpected return type from 'org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, Configuration)'.");@b@ throw new RuntimeException("The value returned from org.apache.hadoop.fs.FileSystem.getFileSystemClass(String, Configuration) is not a valid subclass of org.apache.hadoop.fs.FileSystem.");@b@ }@b@@b@ }@b@@b@ if (fsClass == null)@b@ {@b@ if (LOG.isDebugEnabled()) {@b@ LOG.debug("Falling back to loading HDFS class old Hadoop style. Looking for HDFS class configuration entry 'fs.hdfs.impl'.");@b@ }@b@@b@ Class classFromConfig = this.conf.getClass("fs.hdfs.impl", null);@b@@b@ if (classFromConfig != null)@b@ {@b@ if (FileSystem.class.isAssignableFrom(classFromConfig)) {@b@ fsClass = classFromConfig.asSubclass(FileSystem.class);@b@@b@ if (!(LOG.isDebugEnabled())) break label432;@b@ LOG.debug(new StringBuilder().append("Loaded HDFS class '").append(fsClass.getName()).append("' as specified in configuration.").toString()); break label432:@b@ }@b@@b@ if (LOG.isDebugEnabled()) {@b@ LOG.debug("HDFS class specified by fs.hdfs.impl is of wrong type.");@b@ }@b@@b@ throw new IOException("HDFS class specified by fs.hdfs.impl cannot be cast to a FileSystem type.");@b@ }@b@@b@ if (LOG.isDebugEnabled())@b@ LOG.debug("Trying to load default HDFS implementation org.apache.hadoop.hdfs.DistributedFileSystem");@b@@b@ try@b@ {@b@ Class reflectedClass = Class.forName("org.apache.hadoop.hdfs.DistributedFileSystem");@b@ if (FileSystem.class.isAssignableFrom(reflectedClass)) {@b@ fsClass = reflectedClass.asSubclass(FileSystem.class);@b@ } else {@b@ if (LOG.isDebugEnabled()) {@b@ LOG.debug("Default HDFS class is of wrong type.");@b@ }@b@@b@ throw new IOException("The default HDFS class 'org.apache.hadoop.hdfs.DistributedFileSystem' cannot be cast to a FileSystem type.");@b@ }@b@ }@b@ catch (ClassNotFoundException e)@b@ {@b@ if (LOG.isDebugEnabled()) {@b@ LOG.debug("Default HDFS class cannot be loaded.");@b@ }@b@@b@ throw new IOException("No HDFS class has been configured and the default class 'org.apache.hadoop.hdfs.DistributedFileSystem' cannot be loaded.");@b@ }@b@@b@ }@b@@b@ label432: this.fs = instantiateFileSystem(fsClass);@b@ }@b@@b@ public static Configuration getHadoopConfiguration()@b@ {@b@ Configuration retConf = new Configuration();@b@@b@ String hdfsDefaultPath = GlobalConfiguration.getString("fs.hdfs.hdfsdefault", null);@b@ if (hdfsDefaultPath != null)@b@ retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));@b@ else {@b@ LOG.debug("Cannot find hdfs-default configuration file");@b@ }@b@@b@ String hdfsSitePath = GlobalConfiguration.getString("fs.hdfs.hdfssite", null);@b@ if (hdfsSitePath != null)@b@ retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));@b@ else {@b@ LOG.debug("Cannot find hdfs-site configuration file");@b@ }@b@@b@ String[] possibleHadoopConfPaths = new String[4];@b@ possibleHadoopConfPaths[0] = GlobalConfiguration.getString("fs.hdfs.hadoopconf", null);@b@ possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");@b@@b@ if (System.getenv("HADOOP_HOME") != null) {@b@ possibleHadoopConfPaths[2] = new StringBuilder().append(System.getenv("HADOOP_HOME")).append("/conf").toString();@b@ possibleHadoopConfPaths[3] = new StringBuilder().append(System.getenv("HADOOP_HOME")).append("/etc/hadoop").toString();@b@ }@b@@b@ for (int i = 0; i < possibleHadoopConfPaths.length; ++i) {@b@ if (possibleHadoopConfPaths[i] == null) {@b@ break label427:@b@ }@b@@b@ if (new File(possibleHadoopConfPaths[i]).exists()) {@b@ if (new File(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/core-site.xml").toString()).exists()) {@b@ retConf.addResource(new org.apache.hadoop.fs.Path(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/core-site.xml").toString()));@b@@b@ if (LOG.isDebugEnabled())@b@ LOG.debug(new StringBuilder().append("Adding ").append(possibleHadoopConfPaths[i]).append("/core-site.xml to hadoop configuration").toString());@b@ }@b@@b@ if (new File(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/hdfs-site.xml").toString()).exists()) {@b@ retConf.addResource(new org.apache.hadoop.fs.Path(new StringBuilder().append(possibleHadoopConfPaths[i]).append("/hdfs-site.xml").toString()));@b@@b@ if (LOG.isDebugEnabled())@b@ LOG.debug(new StringBuilder().append("Adding ").append(possibleHadoopConfPaths[i]).append("/hdfs-site.xml to hadoop configuration").toString());@b@ }@b@ }@b@ }@b@@b@ label427: return retConf;@b@ }@b@@b@ private FileSystem instantiateFileSystem(Class<? extends FileSystem> fsClass) throws IOException@b@ {@b@ try@b@ {@b@ return ((FileSystem)fsClass.newInstance());@b@ }@b@ catch (ExceptionInInitializerError e) {@b@ throw new IOException(new StringBuilder().append("The filesystem class '").append(fsClass.getName()).append("' throw an exception upon initialization.").toString(), e.getException());@b@ }@b@ catch (Throwable t) {@b@ String errorMessage = InstantiationUtil.checkForInstantiationError(fsClass);@b@ if (errorMessage != null)@b@ throw new IOException(new StringBuilder().append("The filesystem class '").append(fsClass.getName()).append("' cannot be instantiated: ").append(errorMessage).toString());@b@@b@ throw new IOException(new StringBuilder().append("An error occurred while instantiating the filesystem class '").append(fsClass.getName()).append("'.").toString(), t);@b@ }@b@ }@b@@b@ public org.apache.flink.core.fs.Path getWorkingDirectory()@b@ {@b@ return new org.apache.flink.core.fs.Path(this.fs.getWorkingDirectory().toUri());@b@ }@b@@b@ public URI getUri()@b@ {@b@ return this.fs.getUri();@b@ }@b@@b@ public void initialize(URI path)@b@ throws IOException@b@ {@b@ if (path.getAuthority() == null)@b@ {@b@ String configEntry = this.conf.get("fs.default.name", null);@b@ if (configEntry == null)@b@ {@b@ configEntry = this.conf.get("fs.defaultFS", null);@b@ }@b@@b@ if (LOG.isDebugEnabled()) {@b@ LOG.debug(new StringBuilder().append("fs.defaultFS is set to ").append(configEntry).toString());@b@ }@b@@b@ if (configEntry == null)@b@ throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("Either no default hdfs configuration was registered, ").append("or that configuration did not contain an entry for the default hdfs.").toString());@b@@b@ try@b@ {@b@ URI initURI = URI.create(configEntry);@b@@b@ if (initURI.getAuthority() == null)@b@ throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("Either no default hdfs configuration was registered, ").append("or the provided configuration contains no valid hdfs namenode address (fs.default.name or fs.defaultFS) describing the hdfs namenode host and port.").toString());@b@@b@ if (!(initURI.getScheme().equalsIgnoreCase("hdfs")))@b@ throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("Either no default hdfs configuration was registered, ").append("or the provided configuration describes a file system with scheme '").append(initURI.getScheme()).append("' other than the Hadoop Distributed File System (HDFS).").toString());@b@@b@ try@b@ {@b@ this.fs.initialize(initURI, this.conf);@b@ }@b@ catch (Exception e) {@b@ throw new IOException(".", e);@b@ }@b@@b@ }@b@ catch (IllegalArgumentException e)@b@ {@b@ throw new IOException(new StringBuilder().append(getMissingAuthorityErrorPrefix(path)).append("The configuration contains an invalid hdfs default name (fs.default.name or fs.defaultFS): ").append(configEntry).toString());@b@ }@b@ }@b@ else@b@ {@b@ try@b@ {@b@ this.fs.initialize(path, this.conf);@b@ }@b@ catch (Exception e) {@b@ throw new IOException(new StringBuilder().append("The given file URI (").append(path.toString()).append(") described the host and port of an HDFS Namenode, but the File System could not be initialized with that address").append(".").toString(), e);@b@ }@b@ }@b@ }@b@@b@ private static final String getMissingAuthorityErrorPrefix(URI path)@b@ {@b@ return new StringBuilder().append("The given HDFS file URI (").append(path.toString()).append(") did not describe the HDFS Namenode.").append(" The attempt to use a default HDFS configuration, as specified in the '").append("fs.hdfs.hdfsdefault").append("' or '").append("fs.hdfs.hdfssite").append("' config parameter failed due to the following problem: ").toString();@b@ }@b@@b@ public org.apache.flink.core.fs.FileStatus getFileStatus(org.apache.flink.core.fs.Path f)@b@ throws IOException@b@ {@b@ org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));@b@ return new DistributedFileStatus(status);@b@ }@b@@b@ public org.apache.flink.core.fs.BlockLocation[] getFileBlockLocations(org.apache.flink.core.fs.FileStatus file, long start, long len)@b@ throws IOException@b@ {@b@ if (!(file instanceof DistributedFileStatus)) {@b@ throw new IOException("file is not an instance of DistributedFileStatus");@b@ }@b@@b@ DistributedFileStatus f = (DistributedFileStatus)file;@b@@b@ org.apache.hadoop.fs.BlockLocation[] blkLocations = this.fs.getFileBlockLocations(f.getInternalFileStatus(), start, len);@b@@b@ DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length];@b@ for (int i = 0; i < distBlkLocations.length; ++i) {@b@ distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]);@b@ }@b@@b@ return distBlkLocations;@b@ }@b@@b@ public org.apache.flink.core.fs.FSDataInputStream open(org.apache.flink.core.fs.Path f, int bufferSize)@b@ throws IOException@b@ {@b@ org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(new org.apache.hadoop.fs.Path(f.toString()), bufferSize);@b@@b@ return new DistributedDataInputStream(fdis);@b@ }@b@@b@ public org.apache.flink.core.fs.FSDataInputStream open(org.apache.flink.core.fs.Path f) throws IOException@b@ {@b@ org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(new org.apache.hadoop.fs.Path(f.toString()));@b@ return new DistributedDataInputStream(fdis);@b@ }@b@@b@ public FSDataOutputStream create(org.apache.flink.core.fs.Path f, boolean overwrite, int bufferSize, short replication, long blockSize)@b@ throws IOException@b@ {@b@ org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);@b@@b@ return new DistributedDataOutputStream(fdos);@b@ }@b@@b@ public FSDataOutputStream create(org.apache.flink.core.fs.Path f, boolean overwrite)@b@ throws IOException@b@ {@b@ org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);@b@@b@ return new DistributedDataOutputStream(fdos);@b@ }@b@@b@ public boolean delete(org.apache.flink.core.fs.Path f, boolean recursive) throws IOException@b@ {@b@ return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), recursive);@b@ }@b@@b@ public org.apache.flink.core.fs.FileStatus[] listStatus(org.apache.flink.core.fs.Path f) throws IOException@b@ {@b@ org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs.listStatus(new org.apache.hadoop.fs.Path(f.toString()));@b@ org.apache.flink.core.fs.FileStatus[] files = new org.apache.flink.core.fs.FileStatus[hadoopFiles.length];@b@@b@ for (int i = 0; i < files.length; ++i) {@b@ files[i] = new DistributedFileStatus(hadoopFiles[i]);@b@ }@b@@b@ return files;@b@ }@b@@b@ public boolean mkdirs(org.apache.flink.core.fs.Path f) throws IOException@b@ {@b@ return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString()));@b@ }@b@@b@ public boolean rename(org.apache.flink.core.fs.Path src, org.apache.flink.core.fs.Path dst) throws IOException@b@ {@b@ return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()), new org.apache.hadoop.fs.Path(dst.toString()));@b@ }@b@@b@ public long getDefaultBlockSize()@b@ {@b@ return this.fs.getDefaultBlockSize();@b@ }@b@@b@ public boolean isDistributedFS()@b@ {@b@ return true;@b@ }@b@}
3.S3FileSystem实现类
package org.apache.flink.runtime.fs.s3;@b@@b@import com.amazonaws.AmazonClientException;@b@import com.amazonaws.AmazonServiceException;@b@import com.amazonaws.auth.AWSCredentials;@b@import com.amazonaws.auth.BasicAWSCredentials;@b@import com.amazonaws.services.s3.AmazonS3Client;@b@import com.amazonaws.services.s3.model.Bucket;@b@import com.amazonaws.services.s3.model.ObjectListing;@b@import com.amazonaws.services.s3.model.ObjectMetadata;@b@import com.amazonaws.services.s3.model.Owner;@b@import com.amazonaws.services.s3.model.S3ObjectSummary;@b@import java.io.FileNotFoundException;@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.net.MalformedURLException;@b@import java.net.URI;@b@import java.net.URISyntaxException;@b@import java.net.URL;@b@import java.net.URLDecoder;@b@import java.util.ArrayList;@b@import java.util.Date;@b@import java.util.Iterator;@b@import java.util.List;@b@import org.apache.commons.logging.Log;@b@import org.apache.commons.logging.LogFactory;@b@import org.apache.flink.configuration.GlobalConfiguration;@b@import org.apache.flink.core.fs.BlockLocation;@b@import org.apache.flink.core.fs.FSDataInputStream;@b@import org.apache.flink.core.fs.FSDataOutputStream;@b@import org.apache.flink.core.fs.FileStatus;@b@import org.apache.flink.core.fs.FileSystem;@b@import org.apache.flink.core.fs.Path;@b@import org.apache.flink.util.StringUtils;@b@@b@public final class S3FileSystem extends FileSystem@b@{@b@ private static final Log LOG = LogFactory.getLog(S3FileSystem.class);@b@ public static final String S3_HOST_KEY = "fs.s3.host";@b@ public static final String S3_PORT_KEY = "fs.s3.port";@b@ public static final String S3_RRS_KEY = "fs.s3.rrs";@b@ public static final String S3_ACCESS_KEY_KEY = "fs.s3.accessKey";@b@ public static final String S3_SECRET_KEY_KEY = "fs.s3.secretKey";@b@ private static final String DEFAULT_S3_HOST = "s3.amazonaws.com";@b@ private static final boolean DEFAULT_S3_RRS = 1;@b@ private static final int DEFAULT_S3_PORT = 80;@b@ private static final String HTTP_PREFIX = "http";@b@ private static final int HTTP_RESOURCE_NOT_FOUND_CODE = 404;@b@ private static final char S3_DIRECTORY_SEPARATOR = 47;@b@ public static final String S3_SCHEME = "s3";@b@ private static final String URL_ENCODE_CHARACTER = "UTF-8";@b@ private String host = null;@b@ private int port = -1;@b@ private URI s3Uri = null;@b@ private AmazonS3Client s3Client = null;@b@ private S3DirectoryStructure directoryStructure = null;@b@ private final boolean useRRS;@b@@b@ public S3FileSystem()@b@ {@b@ this.useRRS = GlobalConfiguration.getBoolean("fs.s3.rrs", true);@b@ LOG.info(new StringBuilder().append("Creating new S3 file system binding with Reduced Redundancy Storage ").append((this.useRRS) ? "enabled" : "disabled").toString());@b@ }@b@@b@ public Path getWorkingDirectory()@b@ {@b@ return new Path(this.s3Uri);@b@ }@b@@b@ public URI getUri()@b@ {@b@ return this.s3Uri;@b@ }@b@@b@ public void initialize(URI name)@b@ throws IOException@b@ {@b@ this.host = name.getHost();@b@ if (this.host == null) {@b@ LOG.debug("Provided URI does not provide a host to connect to, using configuration...");@b@ this.host = GlobalConfiguration.getString("fs.s3.host", "s3.amazonaws.com");@b@ }@b@@b@ this.port = name.getPort();@b@ if (this.port == -1) {@b@ LOG.debug("Provided URI does not provide a port to connect to, using configuration...");@b@ this.port = GlobalConfiguration.getInteger("fs.s3.port", 80);@b@ }@b@@b@ String userInfo = name.getUserInfo();@b@@b@ String awsAccessKey = null;@b@ String awsSecretKey = null;@b@@b@ if (userInfo != null)@b@ {@b@ String[] splits = userInfo.split(":");@b@ if (splits.length > 1) {@b@ awsAccessKey = URLDecoder.decode(splits[0], "UTF-8");@b@ awsSecretKey = URLDecoder.decode(splits[1], "UTF-8");@b@ }@b@ }@b@@b@ if (awsAccessKey == null) {@b@ LOG.debug("Provided URI does not provide an access key to Amazon S3, using configuration...");@b@ awsAccessKey = GlobalConfiguration.getString("fs.s3.accessKey", null);@b@ if (awsAccessKey == null)@b@ throw new IOException("Cannot determine access key to Amazon S3");@b@@b@ }@b@@b@ if (awsSecretKey == null) {@b@ LOG.debug("Provided URI does not provide a secret key to Amazon S3, using configuration...");@b@ awsSecretKey = GlobalConfiguration.getString("fs.s3.secretKey", null);@b@ if (awsSecretKey == null)@b@ throw new IOException("Cannot determine secret key to Amazon S3");@b@@b@ }@b@@b@ AWSCredentials credentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);@b@ this.s3Client = new AmazonS3Client(credentials);@b@@b@ initializeDirectoryStructure(name);@b@ }@b@@b@ private void initializeDirectoryStructure(URI name) throws IOException@b@ {@b@ String basePath = name.getPath();@b@ try@b@ {@b@ String endpoint = new URL("http", this.host, this.port, basePath).toString();@b@ if (LOG.isDebugEnabled()) {@b@ LOG.debug(new StringBuilder().append("Trying S3 endpoint ").append(endpoint).toString());@b@ }@b@@b@ this.s3Client.setEndpoint(endpoint);@b@ Owner owner = this.s3Client.getS3AccountOwner();@b@ LOG.info(new StringBuilder().append("Successfully established connection to Amazon S3 using the endpoint ").append(endpoint).toString());@b@ LOG.info(new StringBuilder().append("Amazon S3 user is ").append(owner.getDisplayName()).toString());@b@ }@b@ catch (MalformedURLException e)@b@ {@b@ throw new IOException(StringUtils.stringifyException(e));@b@ }@b@ catch (AmazonClientException e) {@b@ while (true) {@b@ if (basePath.isEmpty()) {@b@ throw new IOException(new StringBuilder().append("Cannot establish connection to Amazon S3: ").append(StringUtils.stringifyException(e)).toString());@b@ }@b@@b@ int pos = basePath.lastIndexOf("/");@b@ if (pos < 0)@b@ basePath = "";@b@ else@b@ basePath = basePath.substring(0, pos);@b@@b@ }@b@@b@ }@b@@b@ try@b@ {@b@ this.s3Uri = new URI("s3", (String)null, this.host, this.port, basePath, null, null);@b@ } catch (URISyntaxException e) {@b@ throw new IOException(StringUtils.stringifyException(e));@b@ }@b@@b@ this.directoryStructure = new S3DirectoryStructure(basePath);@b@ }@b@@b@ public FileStatus getFileStatus(Path f)@b@ throws IOException@b@ {@b@ S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@@b@ if ((!(bop.hasBucket())) && (!(bop.hasObject())))@b@ return new S3FileStatus(f, 0L, true, 0L, 0L);@b@@b@ try@b@ {@b@ if ((bop.hasBucket()) && (!(bop.hasObject())))@b@ {@b@ List buckets = this.s3Client.listBuckets();@b@ Iterator it = buckets.iterator();@b@@b@ while (it.hasNext())@b@ {@b@ Bucket bucket = (Bucket)it.next();@b@ if (bop.getBucket().equals(bucket.getName()))@b@ {@b@ long creationDate = dateToLong(bucket.getCreationDate());@b@@b@ return new S3FileStatus(f, 0L, true, creationDate, 0L);@b@ }@b@ }@b@@b@ throw new FileNotFoundException(new StringBuilder().append("Cannot find ").append(f.toUri()).toString());@b@ }@b@ try@b@ {@b@ ObjectMetadata om = this.s3Client.getObjectMetadata(bop.getBucket(), bop.getObject());@b@ long modificationDate = dateToLong(om.getLastModified());@b@@b@ if (objectRepresentsDirectory(bop.getObject(), om.getContentLength()))@b@ return new S3FileStatus(f, 0L, true, modificationDate, 0L);@b@@b@ return new S3FileStatus(f, om.getContentLength(), false, modificationDate, 0L);@b@ }@b@ catch (AmazonServiceException e)@b@ {@b@ if (e.getStatusCode() == 404)@b@ throw new FileNotFoundException(new StringBuilder().append("Cannot find ").append(f.toUri()).toString());@b@@b@ throw e;@b@ }@b@ }@b@ catch (AmazonClientException e) {@b@ throw new IOException(StringUtils.stringifyException(e));@b@ }@b@ }@b@@b@ private static long dateToLong(Date date)@b@ {@b@ if (date == null) {@b@ return 0L;@b@ }@b@@b@ return date.getTime();@b@ }@b@@b@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len)@b@ throws IOException@b@ {@b@ if (start + len > file.getLen()) {@b@ return null;@b@ }@b@@b@ S3BlockLocation bl = new S3BlockLocation(this.host, file.getLen());@b@@b@ return new BlockLocation[] { bl };@b@ }@b@@b@ public FSDataInputStream open(Path f, int bufferSize)@b@ throws IOException@b@ {@b@ return open(f);@b@ }@b@@b@ public FSDataInputStream open(Path f)@b@ throws IOException@b@ {@b@ FileStatus fileStatus = getFileStatus(f);@b@@b@ if (fileStatus.isDir()) {@b@ throw new IOException(new StringBuilder().append("Cannot open ").append(f.toUri()).append(" because it is a directory").toString());@b@ }@b@@b@ S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@ if ((!(bop.hasBucket())) || (!(bop.hasObject()))) {@b@ throw new IOException(new StringBuilder().append(f.toUri()).append(" cannot be opened").toString());@b@ }@b@@b@ return new S3DataInputStream(this.s3Client, bop.getBucket(), bop.getObject());@b@ }@b@@b@ public FileStatus[] listStatus(Path f)@b@ throws IOException@b@ {@b@ S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@ try@b@ {@b@ if (!(bop.hasBucket()))@b@ {@b@ List list = this.s3Client.listBuckets();@b@ S3FileStatus[] array = new S3FileStatus[list.size()];@b@ Iterator it = list.iterator();@b@ int i = 0;@b@ while (it.hasNext()) {@b@ Bucket bucket = (Bucket)it.next();@b@ long creationDate = dateToLong(bucket.getCreationDate());@b@@b@ S3FileStatus status = new S3FileStatus(extendPath(f, new StringBuilder().append(bucket.getName()).append('/').toString()), 0L, true, creationDate, 0L);@b@@b@ array[(i++)] = status;@b@ }@b@@b@ return array;@b@ }@b@@b@ if ((bop.hasBucket()) && (!(bop.hasObject())))@b@ {@b@ if (!(this.s3Client.doesBucketExist(bop.getBucket()))) {@b@ throw new FileNotFoundException(new StringBuilder().append("Cannot find ").append(f.toUri()).toString());@b@ }@b@@b@ return listBucketContent(f, bop);@b@ }@b@@b@ ObjectMetadata omd = this.s3Client.getObjectMetadata(bop.getBucket(), bop.getObject());@b@ if (objectRepresentsDirectory(bop.getObject(), omd.getContentLength()))@b@ {@b@ return listBucketContent(f, bop);@b@ }@b@@b@ S3FileStatus fileStatus = new S3FileStatus(f, omd.getContentLength(), false, dateToLong(omd.getLastModified()), 0L);@b@@b@ return new FileStatus[] { fileStatus };@b@ }@b@ catch (AmazonClientException e)@b@ {@b@ throw new IOException(StringUtils.stringifyException(e));@b@ }@b@ }@b@@b@ private S3FileStatus[] listBucketContent(Path f, S3BucketObjectPair bop) throws IOException@b@ {@b@ ObjectListing listing = null;@b@ List resultList = new ArrayList();@b@@b@ int depth = (bop.hasObject()) ? getDepth(bop.getObject()) + 1 : 0;@b@ while (true)@b@ {@b@ if (listing == null)@b@ if (bop.hasObject())@b@ listing = this.s3Client.listObjects(bop.getBucket(), bop.getObject());@b@ else@b@ listing = this.s3Client.listObjects(bop.getBucket());@b@@b@ else {@b@ listing = this.s3Client.listNextBatchOfObjects(listing);@b@ }@b@@b@ List list = listing.getObjectSummaries();@b@ Iterator it = list.iterator();@b@ while (true) { S3ObjectSummary os;@b@ String key;@b@ S3FileStatus fileStatus;@b@ while (true) { while (true) { if (!(it.hasNext()))@b@ break label274;@b@ os = (S3ObjectSummary)it.next();@b@ key = os.getKey();@b@@b@ int childDepth = getDepth(os.getKey());@b@@b@ if (childDepth == depth)@b@ break;@b@@b@ }@b@@b@ if (!(bop.hasObject())) break;@b@ if (key.startsWith(bop.getObject())) {@b@ key = key.substring(bop.getObject().length());@b@ }@b@@b@ if (!(key.isEmpty()))@b@ break;@b@@b@ }@b@@b@ long modificationDate = dateToLong(os.getLastModified());@b@@b@ if (objectRepresentsDirectory(os))@b@ fileStatus = new S3FileStatus(extendPath(f, key), 0L, true, modificationDate, 0L);@b@ else {@b@ fileStatus = new S3FileStatus(extendPath(f, key), os.getSize(), false, modificationDate, 0L);@b@ }@b@@b@ resultList.add(fileStatus);@b@ }@b@@b@ if (!(listing.isTruncated())) {@b@ label274: break;@b@ }@b@@b@ }@b@@b@ return ((S3FileStatus[])resultList.toArray(new S3FileStatus[0]));@b@ }@b@@b@ private static int getDepth(String key)@b@ {@b@ int depth = 0;@b@ int nextStartPos = 0;@b@@b@ int length = key.length();@b@@b@ while (nextStartPos < length)@b@ {@b@ int sepPos = key.indexOf(47, nextStartPos);@b@ if (sepPos < 0)@b@ break;@b@@b@ ++depth;@b@ nextStartPos = sepPos + 1;@b@ }@b@@b@ if ((length > 0) && @b@ (key.charAt(length - 1) == '/')) {@b@ --depth;@b@ }@b@@b@ return depth;@b@ }@b@@b@ public boolean delete(Path f, boolean recursive) throws IOException@b@ {@b@ FileStatus fileStatus;@b@ try@b@ {@b@ fileStatus = getFileStatus(f);@b@ S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@@b@ if (fileStatus.isDir())@b@ {@b@ FileStatus[] arr$;@b@ int i$;@b@ boolean retVal = false;@b@ FileStatus[] dirContent = listStatus(f);@b@ if (dirContent.length > 0)@b@ {@b@ if (!(recursive)) {@b@ throw new IOException(new StringBuilder().append("Found non-empty directory ").append(f).append(" while performing non-recursive delete").toString());@b@ }@b@@b@ arr$ = dirContent; int len$ = arr$.length; for (i$ = 0; i$ < len$; ++i$) { FileStatus entry = arr$[i$];@b@@b@ if (delete(entry.getPath(), true)) {@b@ retVal = true;@b@ }@b@@b@ }@b@@b@ }@b@@b@ if (!(bop.hasBucket()))@b@ {@b@ return retVal;@b@ }@b@@b@ if (!(bop.hasObject()))@b@ {@b@ this.s3Client.deleteBucket(bop.getBucket());@b@ }@b@ else@b@ this.s3Client.deleteObject(bop.getBucket(), bop.getObject());@b@ }@b@ else@b@ {@b@ this.s3Client.deleteObject(bop.getBucket(), bop.getObject());@b@ }@b@ } catch (AmazonClientException e) {@b@ throw new IOException(StringUtils.stringifyException(e));@b@ }@b@@b@ return true; } @b@ @b@ public boolean mkdirs(Path f) throws IOException { } @b@ private void createEmptyObject(String bucketName, String objectName) { InputStream im = new InputStream(this)@b@ {@b@ public int read()@b@ throws IOException@b@ {@b@ return -1;@b@ }@b@@b@ };@b@ ObjectMetadata om = new ObjectMetadata();@b@ om.setContentLength(0L);@b@@b@ this.s3Client.putObject(bucketName, objectName, im, om);@b@ }@b@@b@ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize)@b@ throws IOException@b@ {@b@ if ((!(overwrite)) && (exists(f))) {@b@ throw new IOException(new StringBuilder().append(f.toUri()).append(" already exists").toString());@b@ }@b@@b@ S3BucketObjectPair bop = this.directoryStructure.toBucketObjectPair(f);@b@ if ((!(bop.hasBucket())) || (!(bop.hasObject()))) {@b@ throw new IOException(new StringBuilder().append(f.toUri()).append(" is not a valid path to create a new file").toString());@b@ }@b@@b@ if (bufferSize < 5242880) {@b@ throw new IOException("Provided buffer must be at least 5242880 bytes");@b@ }@b@@b@ byte[] buf = new byte[bufferSize];@b@@b@ return new S3DataOutputStream(this.s3Client, bop.getBucket(), bop.getObject(), buf, this.useRRS);@b@ }@b@@b@ public FSDataOutputStream create(Path f, boolean overwrite)@b@ throws IOException@b@ {@b@ return create(f, overwrite, 5242880, 1, 1024L);@b@ }@b@@b@ private boolean objectRepresentsDirectory(S3ObjectSummary os)@b@ {@b@ return objectRepresentsDirectory(os.getKey(), os.getSize());@b@ }@b@@b@ private boolean objectRepresentsDirectory(String name, long size)@b@ {@b@ if (name.isEmpty()) {@b@ return false;@b@ }@b@@b@ return ((name.charAt(name.length() - 1) == '/') && (size == 0L));@b@ }@b@@b@ static Path extendPath(Path parent, String extension)@b@ throws IOException@b@ {@b@ String extendedPath;@b@ URI parentUri = parent.toUri();@b@@b@ if (extension.isEmpty()) {@b@ return parent;@b@ }@b@@b@ String path = parentUri.getPath();@b@@b@ if (path.isEmpty()) {@b@ if (extension.charAt(0) == '/')@b@ extendedPath = extension;@b@ else@b@ extendedPath = new StringBuilder().append("/").append(extension).toString();@b@@b@ }@b@ else if (path.charAt(path.length() - 1) == '/') {@b@ if (extension.charAt(0) == '/')@b@ if (extension.length() > 1)@b@ extendedPath = new StringBuilder().append(path).append(extension.substring(1)).toString();@b@ else@b@ extendedPath = path;@b@@b@ else@b@ extendedPath = new StringBuilder().append(path).append(extension).toString();@b@@b@ }@b@ else if (extension.charAt(0) == '/')@b@ extendedPath = new StringBuilder().append(path).append(extension).toString();@b@ else {@b@ extendedPath = new StringBuilder().append(path).append("/").append(extension).toString();@b@ }@b@@b@ try@b@ {@b@ URI extendedUri = new URI(parentUri.getScheme(), "", extendedPath, parentUri.getQuery(), parentUri.getFragment());@b@@b@ return new Path(extendedUri);@b@ } catch (URISyntaxException e) {@b@ throw new IOException(StringUtils.stringifyException(e));@b@ }@b@ }@b@@b@ public boolean rename(Path src, Path dst)@b@ throws IOException@b@ {@b@ throw new UnsupportedOperationException("This method is not yet implemented");@b@ }@b@@b@ public boolean isDistributedFS()@b@ {@b@ return true;@b@ }@b@}