posts - 496,comments - 227,trackbacks - 0

BTW:再次感叹下没有机器, 3.4G的语料,单机处理了10来个小时, 真是郁闷~~ 要是有N台机器多好啊.

 

在很多时候,特别是处理大数据的时候,我们希望一道MapReduce过程就可以解决几个问题。这样可以避免再次读取数据。比如:在做文本聚类/分 类的时候,mapper读取语料,进行分词后,要同时算出每个词条(term)的term frequency以及它的document frequency. 前者对于每个词条来说其实是个向量, 它代表此词条在N篇文档各中的词频;而后者就是一个非负整数。 这时候就可以借助一种特殊的Writable类:GenericWritable.

 

用法是:继承这个类,然后把你要输出value的Writable类型加进它的CLASSES静态变量里,在后面的TermMapper和 TermReducer中我的value使用了三种ArrayWritable,IntWritable和我自已定义的TFWritable,所以要把三 者全加入TermWritable的CLASSES中。

Java代码  收藏代码
  1. package redpoll.examples;  
  2.   
  3. import org.apache.hadoop.io.GenericWritable;  
  4. import org.apache.hadoop.io.Writable;  
  5.   
  6. /** 
  7.  * Generic Writable class for terms. 
  8.  * @author Jeremy Chow(coderplay@gmail.com) 
  9.  */  
  10. public class TermWritable extends GenericWritable {  
  11.   private static Class<? extends Writable>[] CLASSES = null;  
  12.   
  13.   static {  
  14.     CLASSES = (Class<? extends Writable>[]) new Class[] {  
  15.         org.apache.hadoop.io.ArrayWritable.class,  
  16.         org.apache.hadoop.io.IntWritable.class,  
  17.         redpoll.examples.TFWritable.class  
  18.         };  
  19.   }  
  20.   
  21.   public TermWritable() {  
  22.   }  
  23.   
  24.   public TermWritable(Writable instance) {  
  25.     set(instance);  
  26.   }  
  27.   
  28.   @Override  
  29.   protected Class<? extends Writable>[] getTypes() {  
  30.     return CLASSES;  
  31.   }  
  32. }  

 Mapper在collect数据时,用刚才定义的TermWritable来包装(wrap)要使用的Writable类。

Java代码  收藏代码
  1. package redpoll.examples;  
  2.   
  3. import java.io.IOException;  
  4. import java.io.StringReader;  
  5.   
  6. import org.apache.commons.logging.Log;  
  7. import org.apache.commons.logging.LogFactory;  
  8. import org.apache.hadoop.io.IntWritable;  
  9. import org.apache.hadoop.io.LongWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.mapred.JobConf;  
  12. import org.apache.hadoop.mapred.MapReduceBase;  
  13. import org.apache.hadoop.mapred.Mapper;  
  14. import org.apache.hadoop.mapred.OutputCollector;  
  15. import org.apache.hadoop.mapred.Reporter;  
  16. import org.apache.lucene.analysis.Analyzer;  
  17. import org.apache.lucene.analysis.Token;  
  18. import org.apache.lucene.analysis.TokenStream;  
  19. import org.apache.lucene.analysis.standard.StandardAnalyzer;  
  20.   
  21. /** 
  22.  * A class provides for doing words segmenation and counting term TFs and DFs.<p> 
  23.  * in: key is document id, value is a document instance. <br> 
  24.  * output: 
  25.  * <li>key is term, value is a <documentId, tf> pair</li> 
  26.  * <li>key is term, value is document frequency corresponsing to the key</li> 
  27.  * @author Jeremy Chow(coderplay@gmail.com) 
  28.  */  
  29. public class TermMapper extends MapReduceBase implements  
  30.     Mapper<LongWritable, Document, Text, TermWritable> {  
  31.   private static final Log log = LogFactory.getLog(TermMapper.class  
  32.       .getName());  
  33.     
  34.   /* analyzer for words segmentation */  
  35.   private Analyzer analyzer = null;  
  36.      
  37.   /* frequency weight for document title */  
  38.   private IntWritable titleWeight = new IntWritable(2);  
  39.   /* frequency weight for document content */  
  40.   private IntWritable contentWeight = new IntWritable(1);  
  41.   
  42.     
  43.   public void map(LongWritable key, Document value,  
  44.       OutputCollector<Text, TermWritable> output, Reporter reporter)  
  45.       throws IOException {  
  46.     doMap(key, value.getTitle(), titleWeight, output, reporter);  
  47.     doMap(key, value.getContent(), contentWeight, output, reporter);  
  48.   }  
  49.     
  50.   private void doMap(LongWritable key, String value, IntWritable weight,  
  51.       OutputCollector<Text, TermWritable> output, Reporter reporter)  
  52.       throws IOException {  
  53.     // do words segmentation  
  54.     TokenStream ts = analyzer.tokenStream("dummy", new StringReader(value));  
  55.     Token token = new Token();  
  56.     while ((token = ts.next(token)) != null) {  
  57.       String termString = new String(token.termBuffer(), 0, token.termLength());  
  58.       Text term = new Text(termString);  
  59.       // <term, <documentId,tf>>  
  60.       TFWritable tf = new TFWritable(key, weight);  
  61.       output.collect(term, new TermWritable(tf)); // wrap then collect  
  62.       // <term, weight>  
  63.       output.collect(term, new TermWritable(weight)); // wrap then collect  
  64.     }  
  65.   }  
  66.       
  67.   @Override  
  68.   public void configure(JobConf job) {  
  69.     String analyzerName = job.get("redpoll.text.analyzer");  
  70.     try {  
  71.       if (analyzerName != null)  
  72.         analyzer = (Analyzer) Class.forName(analyzerName).newInstance();  
  73.     } catch (Exception excp) {  
  74.       excp.printStackTrace();  
  75.     }  
  76.     if (analyzer == null)  
  77.       analyzer = new StandardAnalyzer();  
  78.   }  
  79.   
  80. }  
 

Reduce如果想获取数据,则可以解包(unwrap)它:

Java代码  收藏代码
  1. package redpoll.examples;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.ArrayList;  
  5. import java.util.Iterator;  
  6.   
  7. import org.apache.commons.logging.Log;  
  8. import org.apache.commons.logging.LogFactory;  
  9. import org.apache.hadoop.io.ArrayWritable;  
  10. import org.apache.hadoop.io.IntWritable;  
  11. import org.apache.hadoop.io.Text;  
  12. import org.apache.hadoop.io.Writable;  
  13. import org.apache.hadoop.mapred.MapReduceBase;  
  14. import org.apache.hadoop.mapred.OutputCollector;  
  15. import org.apache.hadoop.mapred.Reducer;  
  16. import org.apache.hadoop.mapred.Reporter;  
  17.   
  18. /** 
  19.  * Form a tf vector and caculate the df for terms. 
  20.  * @author Jeremy Chow(coderplay@gmail.com) 
  21.  */  
  22. public class TermReducer extends MapReduceBase implements Reducer<Text, TermWritable, Text, Writable> {  
  23.     
  24.   private static final Log log = LogFactory.getLog(TermReducer.class.getName());  
  25.     
  26.   public void reduce(Text key, Iterator<TermWritable> values,  
  27.       OutputCollector<Text, Writable> output, Reporter reporter)  
  28.       throws IOException {  
  29.     ArrayList<TFWritable> tfs = new ArrayList<TFWritable>();  
  30.     int sum = 0;  
  31. //    log.info("term:" + key.toString());  
  32.     while (values.hasNext()) {  
  33.       Writable value = values.next().get(); // unwrap  
  34.       if (value  instanceof TFWritable) {  
  35.         tfs.add((TFWritable) value );   
  36.       }else {  
  37.         sum += ((IntWritable) value).get();  
  38.       }  
  39.     }  
  40.       
  41.     TFWritable writables[] = new TFWritable[tfs.size()];  
  42.     ArrayWritable aw = new ArrayWritable(TFWritable.class, tfs.toArray(writables));  
  43.     // wrap again  
  44.     output.collect(key, new TermWritable(aw));   
  45.     output.collect(key, new TermWritable(new IntWritable(sum)));  
  46.   }  
  47.   
  48. }  

 这儿collect的时候可以不再用TermWritable,只不过我在重新定义了OutputFormat,让它输出到两个不同的文件,而且输出的类型也是不一样的。

 

posted on 2014-11-11 14:10 SIMONE 阅读(231) 评论(0)  编辑  收藏 所属分类: JAVA

只有注册用户登录后才能发表评论。


网站导航: