一、前言
关于mahout源码包中对apache的hadoop通过工具类org.apache.mahout.common.HadoopUtil,进行org.apache.hadoop.mapreduce.Job任务创建准备、获取分布式文件系统缓存路径getSingleCachedFile等。
二、源码说明
package org.apache.mahout.common;@b@@b@import java.io.FileNotFoundException;@b@import java.io.IOException;@b@import java.io.InputStream;@b@import java.net.URI;@b@import java.util.ArrayList;@b@import java.util.Arrays;@b@import java.util.Comparator;@b@import java.util.Iterator;@b@import java.util.List;@b@@b@import com.google.common.base.Joiner;@b@import com.google.common.base.Preconditions;@b@import org.apache.hadoop.conf.Configuration;@b@import org.apache.hadoop.filecache.DistributedCache;@b@import org.apache.hadoop.fs.FSDataInputStream;@b@import org.apache.hadoop.fs.FSDataOutputStream;@b@import org.apache.hadoop.fs.FileStatus;@b@import org.apache.hadoop.fs.FileSystem;@b@import org.apache.hadoop.fs.LocalFileSystem;@b@import org.apache.hadoop.fs.Path;@b@import org.apache.hadoop.fs.PathFilter;@b@import org.apache.hadoop.io.Writable;@b@import org.apache.hadoop.mapreduce.InputFormat;@b@import org.apache.hadoop.mapreduce.Job;@b@import org.apache.hadoop.mapreduce.JobContext;@b@import org.apache.hadoop.mapreduce.Mapper;@b@import org.apache.hadoop.mapreduce.OutputFormat;@b@import org.apache.hadoop.mapreduce.Reducer;@b@import org.apache.mahout.common.iterator.sequencefile.PathType;@b@import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;@b@import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;@b@import org.slf4j.Logger;@b@import org.slf4j.LoggerFactory;@b@@b@public final class HadoopUtil {@b@@b@ private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);@b@@b@ private HadoopUtil() { }@b@@b@ @b@ public static Job prepareJob(Path inputPath,@b@ Path outputPath,@b@ Class<? extends InputFormat> inputFormat,@b@ Class<? extends Mapper> mapper,@b@ Class<? extends Writable> mapperKey,@b@ Class<? extends Writable> mapperValue,@b@ Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {@b@@b@ Job job = new Job(new Configuration(conf));@b@ Configuration jobConf = job.getConfiguration();@b@@b@ if (mapper.equals(Mapper.class)) {@b@ throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");@b@ }@b@ job.setJarByClass(mapper);@b@@b@ job.setInputFormatClass(inputFormat);@b@ jobConf.set("mapred.input.dir", inputPath.toString());@b@@b@ job.setMapperClass(mapper);@b@ job.setMapOutputKeyClass(mapperKey);@b@ job.setMapOutputValueClass(mapperValue);@b@ job.setOutputKeyClass(mapperKey);@b@ job.setOutputValueClass(mapperValue);@b@ jobConf.setBoolean("mapred.compress.map.output", true);@b@ job.setNumReduceTasks(0);@b@@b@ job.setOutputFormatClass(outputFormat);@b@ jobConf.set("mapred.output.dir", outputPath.toString());@b@@b@ return job;@b@ }@b@@b@ @b@ public static Job prepareJob(Path inputPath,@b@ Path outputPath,@b@ Class<? extends InputFormat> inputFormat,@b@ Class<? extends Mapper> mapper,@b@ Class<? extends Writable> mapperKey,@b@ Class<? extends Writable> mapperValue,@b@ Class<? extends Reducer> reducer,@b@ Class<? extends Writable> reducerKey,@b@ Class<? extends Writable> reducerValue,@b@ Class<? extends OutputFormat> outputFormat,@b@ Configuration conf) throws IOException {@b@@b@ Job job = new Job(new Configuration(conf));@b@ Configuration jobConf = job.getConfiguration();@b@@b@ if (reducer.equals(Reducer.class)) {@b@ if (mapper.equals(Mapper.class)) {@b@ throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");@b@ }@b@ job.setJarByClass(mapper);@b@ } else {@b@ job.setJarByClass(reducer);@b@ }@b@@b@ job.setInputFormatClass(inputFormat);@b@ jobConf.set("mapred.input.dir", inputPath.toString());@b@@b@ job.setMapperClass(mapper);@b@ if (mapperKey != null) {@b@ job.setMapOutputKeyClass(mapperKey);@b@ }@b@ if (mapperValue != null) {@b@ job.setMapOutputValueClass(mapperValue);@b@ }@b@@b@ jobConf.setBoolean("mapred.compress.map.output", true);@b@@b@ job.setReducerClass(reducer);@b@ job.setOutputKeyClass(reducerKey);@b@ job.setOutputValueClass(reducerValue);@b@@b@ job.setOutputFormatClass(outputFormat);@b@ jobConf.set("mapred.output.dir", outputPath.toString());@b@@b@ return job;@b@ }@b@@b@@b@ public static String getCustomJobName(String className, JobContext job,@b@ Class<? extends Mapper> mapper,@b@ Class<? extends Reducer> reducer) {@b@ StringBuilder name = new StringBuilder(100);@b@ String customJobName = job.getJobName();@b@ if (customJobName == null || customJobName.trim().isEmpty()) {@b@ name.append(className);@b@ } else {@b@ name.append(customJobName);@b@ }@b@ name.append('-').append(mapper.getSimpleName());@b@ name.append('-').append(reducer.getSimpleName());@b@ return name.toString();@b@ }@b@@b@@b@ public static void delete(Configuration conf, Iterable<Path> paths) throws IOException {@b@ if (conf == null) {@b@ conf = new Configuration();@b@ }@b@ for (Path path : paths) {@b@ FileSystem fs = path.getFileSystem(conf);@b@ if (fs.exists(path)) {@b@ log.info("Deleting {}", path);@b@ fs.delete(path, true);@b@ }@b@ }@b@ }@b@@b@ public static void delete(Configuration conf, Path... paths) throws IOException {@b@ delete(conf, Arrays.asList(paths));@b@ }@b@@b@ public static long countRecords(Path path, Configuration conf) throws IOException {@b@ long count = 0;@b@ Iterator<?> iterator = new SequenceFileValueIterator<>(path, true, conf);@b@ while (iterator.hasNext()) {@b@ iterator.next();@b@ count++;@b@ }@b@ return count;@b@ }@b@@b@ @b@ public static long countRecords(Path path, PathType pt, PathFilter filter, Configuration conf) throws IOException {@b@ long count = 0;@b@ Iterator<?> iterator = new SequenceFileDirValueIterator<>(path, pt, filter, null, true, conf);@b@ while (iterator.hasNext()) {@b@ iterator.next();@b@ count++;@b@ }@b@ return count;@b@ }@b@@b@ public static InputStream openStream(Path path, Configuration conf) throws IOException {@b@ FileSystem fs = FileSystem.get(path.toUri(), conf);@b@ return fs.open(path.makeQualified(path.toUri(), path));@b@ }@b@@b@ public static FileStatus[] getFileStatus(Path path, PathType pathType, PathFilter filter,@b@ Comparator<FileStatus> ordering, Configuration conf) throws IOException {@b@ FileStatus[] statuses;@b@ FileSystem fs = path.getFileSystem(conf);@b@ if (filter == null) {@b@ statuses = pathType == PathType.GLOB ? fs.globStatus(path) : listStatus(fs, path);@b@ } else {@b@ statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : listStatus(fs, path, filter);@b@ }@b@ if (ordering != null) {@b@ Arrays.sort(statuses, ordering);@b@ }@b@ return statuses;@b@ }@b@@b@ public static FileStatus[] listStatus(FileSystem fs, Path path) throws IOException {@b@ try {@b@ return fs.listStatus(path);@b@ } catch (FileNotFoundException e) {@b@ return new FileStatus[0];@b@ }@b@ }@b@@b@ public static FileStatus[] listStatus(FileSystem fs, Path path, PathFilter filter) throws IOException {@b@ try {@b@ return fs.listStatus(path, filter);@b@ } catch (FileNotFoundException e) {@b@ return new FileStatus[0];@b@ }@b@ }@b@@b@ public static void cacheFiles(Path fileToCache, Configuration conf) {@b@ DistributedCache.setCacheFiles(new URI[]{fileToCache.toUri()}, conf);@b@ }@b@@b@ @b@ public static Path getSingleCachedFile(Configuration conf) throws IOException {@b@ return getCachedFiles(conf)[0];@b@ }@b@@b@ @b@ public static Path[] getCachedFiles(Configuration conf) throws IOException {@b@ LocalFileSystem localFs = FileSystem.getLocal(conf);@b@ Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);@b@@b@ URI[] fallbackFiles = DistributedCache.getCacheFiles(conf);@b@@b@ // fallback for local execution@b@ if (cacheFiles == null) {@b@@b@ Preconditions.checkState(fallbackFiles != null, "Unable to find cached files!");@b@@b@ cacheFiles = new Path[fallbackFiles.length];@b@ for (int n = 0; n < fallbackFiles.length; n++) {@b@ cacheFiles[n] = new Path(fallbackFiles[n].getPath());@b@ }@b@ } else {@b@@b@ for (int n = 0; n < cacheFiles.length; n++) {@b@ cacheFiles[n] = localFs.makeQualified(cacheFiles[n]);@b@ // fallback for local execution@b@ if (!localFs.exists(cacheFiles[n])) {@b@ cacheFiles[n] = new Path(fallbackFiles[n].getPath());@b@ }@b@ }@b@ }@b@@b@ Preconditions.checkState(cacheFiles.length > 0, "Unable to find cached files!");@b@@b@ return cacheFiles;@b@ }@b@@b@ public static void setSerializations(Configuration configuration) {@b@ configuration.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"@b@ + "org.apache.hadoop.io.serializer.WritableSerialization");@b@ }@b@@b@ public static void writeInt(int value, Path path, Configuration configuration) throws IOException {@b@ FileSystem fs = FileSystem.get(path.toUri(), configuration);@b@ try (FSDataOutputStream out = fs.create(path)) {@b@ out.writeInt(value);@b@ }@b@ }@b@@b@ public static int readInt(Path path, Configuration configuration) throws IOException {@b@ FileSystem fs = FileSystem.get(path.toUri(), configuration);@b@ try (FSDataInputStream in = fs.open(path)) {@b@ return in.readInt();@b@ }@b@ }@b@@b@ /**@b@ * Builds a comma-separated list of input splits@b@ * @param fs - File System@b@ * @param fileStatus - File Status@b@ * @return list of directories as a comma-separated String@b@ * @throws IOException - IO Exception@b@ */@b@ public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {@b@ boolean containsFiles = false;@b@ List<String> directoriesList = new ArrayList<>();@b@ for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {@b@ if (childFileStatus.isDir()) {@b@ String subDirectoryList = buildDirList(fs, childFileStatus);@b@ directoriesList.add(subDirectoryList);@b@ } else {@b@ containsFiles = true;@b@ }@b@ }@b@@b@ if (containsFiles) {@b@ directoriesList.add(fileStatus.getPath().toUri().getPath());@b@ }@b@ return Joiner.on(',').skipNulls().join(directoriesList.iterator());@b@ }@b@@b@ /**@b@ * Builds a comma-separated list of input splits@b@ * @param fs - File System@b@ * @param fileStatus - File Status@b@ * @param pathFilter - path filter@b@ * @return list of directories as a comma-separated String@b@ * @throws IOException - IO Exception@b@ */@b@ public static String buildDirList(FileSystem fs, FileStatus fileStatus, PathFilter pathFilter) throws IOException {@b@ boolean containsFiles = false;@b@ List<String> directoriesList = new ArrayList<>();@b@ for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), pathFilter)) {@b@ if (childFileStatus.isDir()) {@b@ String subDirectoryList = buildDirList(fs, childFileStatus);@b@ directoriesList.add(subDirectoryList);@b@ } else {@b@ containsFiles = true;@b@ }@b@ }@b@@b@ if (containsFiles) {@b@ directoriesList.add(fileStatus.getPath().toUri().getPath());@b@ }@b@ return Joiner.on(',').skipNulls().join(directoriesList.iterator());@b@ }@b@@b@ /**@b@ *@b@ * @param configuration - configuration@b@ * @param filePath - Input File Path@b@ * @return relative file Path@b@ * @throws IOException - IO Exception@b@ */@b@ public static String calcRelativeFilePath(Configuration configuration, Path filePath) throws IOException {@b@ FileSystem fs = filePath.getFileSystem(configuration);@b@ FileStatus fst = fs.getFileStatus(filePath);@b@ String currentPath = fst.getPath().toString().replaceFirst("file:", "");@b@@b@ String basePath = configuration.get("baseinputpath");@b@ if (!basePath.endsWith("/")) {@b@ basePath += "/";@b@ }@b@ basePath = basePath.replaceFirst("file:", "");@b@ String[] parts = currentPath.split(basePath);@b@@b@ if (parts.length == 2) {@b@ return parts[1];@b@ } else if (parts.length == 1) {@b@ return parts[0];@b@ }@b@ return currentPath;@b@ }@b@@b@ /**@b@ * Finds a file in the DistributedCache@b@ *@b@ * @param partOfFilename a substring of the file name@b@ * @param localFiles holds references to files stored in distributed cache@b@ * @return Path to first matched file or null if nothing was found@b@ **/@b@ public static Path findInCacheByPartOfFilename(String partOfFilename, URI[] localFiles) {@b@ for (URI distCacheFile : localFiles) {@b@ log.info("trying find a file in distributed cache containing [{}] in its name", partOfFilename);@b@ if (distCacheFile != null && distCacheFile.toString().contains(partOfFilename)) {@b@ log.info("found file [{}] containing [{}]", distCacheFile.toString(), partOfFilename);@b@ return new Path(distCacheFile.getPath());@b@ }@b@ }@b@ return null;@b@ }@b@}