某日,接手了同事写的从Hadoop集群拷贝数据到另外一个集群的程序,该程序是运行在Hadoop集群上的job。这个job只有map阶段,读取hdfs目录下数据的数据,然后写入到另外一个集群。
 显然,这个程序没有考虑大数据量的情况,如果输入目录下文件很多或数据量很大,就会导致map数很多。而实际上我们需要拷贝的一个数据源就有近 6T,job启动起来有1w多个map,一下子整个queue的资源就占满了。虽然通过调整一些参数可以控制map数(也就是并发数),但是无法准确的控 制map数,而且换个数据源又得重新配置参数。
 第一个改进的版本是,加了Reduce过程,以期望通过设置Reduce数量来控制并发数。这样虽然能精确地控制并发数,但是增加了shuffle 过程,实际运行中发现输入数据有倾斜(而partition的key由于业务需要无法更改),导致部分机器网络被打满,从而影响到了集群中的其他应用。即 使通过 mapred.reduce.parallel.copies  参数来限制shuffle也是治标不治本。这个平白增加的shuffle过程实际上浪费了很多网络带宽和IO。
 最理想的情况当然是只有map阶段,而且能够准确的控制并发数了。
 于是,第二个优化版本诞生了。这个job只有map阶段,采用CombineFileInputFormat, 它可以将多个小文件打包成一个InputSplit提供给一个Map处理,避免因为大量小文件问题,启动大量map。通过  mapred.max.split.size  参数可以大概地控制并发数。本以为这样就能解决问题了,结果又发现了数据倾斜的问题。这种粗略地分splits的方式,导致有的map处理的数据少,有的 map处理的数据多,并不均匀。几个拖后退的map就导致job的实际运行时间长了一倍多。
 看来只有让每个map处理的数据量一样多,才能完美的解决这个问题了。
 第三个版本也诞生了,这次是重写了CombineFileInputFormat,自己实现getSplits方法。由于输入数据为SequenceFile格式,因此需要一个SequenceFileRecordReaderWrapper类。
 实现代码如下:
 CustomCombineSequenceFileInputFormat.java
 | importjava.io.IOException; | 
| importorg.apache.hadoop.classification.InterfaceAudience; | 
| importorg.apache.hadoop.classification.InterfaceStability; | 
| importorg.apache.hadoop.mapreduce.InputSplit; | 
| importorg.apache.hadoop.mapreduce.RecordReader; | 
| importorg.apache.hadoop.mapreduce.TaskAttemptContext; | 
| importorg.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; | 
| importorg.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; | 
| importorg.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper; | 
| importorg.apache.hadoop.mapreduce.lib.input.CombineFileSplit; | 
| importorg.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; | 
|  * Input format that is a <code>CombineFileInputFormat</code>-equivalent for | 
|  * <code>SequenceFileInputFormat</code>. | 
|  * @see CombineFileInputFormat | 
| @InterfaceAudience.Public | 
| @InterfaceStability.Stable | 
| publicclassCustomCombineSequenceFileInputFormat<K, V> extendsMultiFileInputFormat<K, V> { | 
|     @SuppressWarnings({"rawtypes", "unchecked"}) | 
|     publicRecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) | 
|         returnnewCombineFileRecordReader((CombineFileSplit) split, context, | 
|                 SequenceFileRecordReaderWrapper.class); | 
|      * A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be | 
|      * used in a <code>CombineFileInputFormat</code>-equivalent for | 
|      * <code>SequenceFileInputFormat</code>. | 
|      * @see CombineFileRecordReader | 
|      * @see CombineFileInputFormat | 
|      * @see SequenceFileInputFormat | 
|     privatestaticclassSequenceFileRecordReaderWrapper<K, V> | 
|             extendsCombineFileRecordReaderWrapper<K, V> { | 
|         // this constructor signature is required by CombineFileRecordReader | 
|         publicSequenceFileRecordReaderWrapper(CombineFileSplit split, TaskAttemptContext context, | 
|                 Integer idx) throwsIOException, InterruptedException { | 
|             super(newSequenceFileInputFormat<K, V>(), split, context, idx); | 
MultiFileInputFormat.java
 | importjava.io.IOException; | 
| importjava.util.ArrayList; | 
| importorg.apache.commons.logging.Log; | 
| importorg.apache.commons.logging.LogFactory; | 
| importorg.apache.hadoop.fs.FileStatus; | 
| importorg.apache.hadoop.fs.Path; | 
| importorg.apache.hadoop.mapreduce.InputSplit; | 
| importorg.apache.hadoop.mapreduce.Job; | 
| importorg.apache.hadoop.mapreduce.JobContext; | 
| importorg.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; | 
| importorg.apache.hadoop.mapreduce.lib.input.CombineFileSplit; | 
|  * multiple files can be combined in one InputSplit so that InputSplit number can be limited! | 
| publicabstractclassMultiFileInputFormat<K, V> extendsCombineFileInputFormat<K, V> { | 
|     privatestaticfinalLog LOG = LogFactory.getLog(MultiFileInputFormat.class); | 
|     publicstaticfinalString CONFNAME_INPUT_SPLIT_MAX_NUM = "multifileinputformat.max_split_num"; | 
|     publicstaticfinalInteger DEFAULT_MAX_SPLIT_NUM = 50; | 
|     publicstaticvoidsetMaxInputSplitNum(Job job, Integer maxSplitNum) { | 
|         job.getConfiguration().setInt(CONFNAME_INPUT_SPLIT_MAX_NUM, maxSplitNum); | 
|     publicList<InputSplit> getSplits(JobContext job) throwsIOException { | 
|         // get all the files in input path | 
|         List<FileStatus> stats = listStatus(job); | 
|         List<InputSplit> splits = newArrayList<InputSplit>(); | 
|         for(FileStatus stat : stats) { | 
|             totalLen += stat.getLen(); | 
|         intmaxSplitNum = job.getConfiguration().getInt(CONFNAME_INPUT_SPLIT_MAX_NUM, DEFAULT_MAX_SPLIT_NUM); | 
|         intexpectSplitNum = maxSplitNum < stats.size() ? maxSplitNum : stats.size(); | 
|         longaverageLen = totalLen / expectSplitNum; | 
|         LOG.info("Prepare InputSplit : averageLen("+ averageLen + ") totalLen("+ totalLen | 
|                 + ") expectSplitNum("+ expectSplitNum + ") "); | 
|         List<Path> pathLst = newArrayList<Path>(); | 
|         List<Long> offsetLst = newArrayList<Long>(); | 
|         List<Long> lengthLst = newArrayList<Long>(); | 
|         for(inti = 0; i < stats.size(); i++) { | 
|             FileStatus stat = stats.get(i); | 
|             pathLst.add(stat.getPath()); | 
|             lengthLst.add(stat.getLen()); | 
|             currentLen += stat.getLen(); | 
|             if(splits.size() < expectSplitNum - 1&& currentLen > averageLen) { | 
|                 Path[] pathArray = newPath[pathLst.size()]; | 
|                 CombineFileSplit thissplit = newCombineFileSplit(pathLst.toArray(pathArray), | 
|                     getLongArray(offsetLst), getLongArray(lengthLst), newString[0]); | 
|                 LOG.info("combineFileSplit("+ splits.size() + ") fileNum("+ pathLst.size() | 
|                         + ") length("+ currentLen + ")"); | 
|         if(pathLst.size() > 0) { | 
|             Path[] pathArray = newPath[pathLst.size()]; | 
|             CombineFileSplit thissplit = | 
|                     newCombineFileSplit(pathLst.toArray(pathArray), getLongArray(offsetLst), | 
|                             getLongArray(lengthLst), newString[0]); | 
|             LOG.info("combineFileSplit("+ splits.size() + ") fileNum("+ pathLst.size() | 
|                     + ") length("+ currentLen + ")"); | 
|     privatelong[] getLongArray(List<Long> lst) { | 
|         long[] rst = newlong[lst.size()]; | 
|         for(inti = 0; i < lst.size(); i++) { | 
通过 multifileinputformat.max_split_num 参数就可以较为准确的控制map数量,而且会发现每个map处理的数据量很均匀。至此,问题总算解决了。