paulwong

WordCount的一个变种版本…Hadoop

统计域名(实际是host)的计数器。

输入:一个文件夹中有一堆的文本文件,内容是一行一个的url,可以想像为数据库中的一条记录
流程:提取url的domain,对domain计数+1
输出:域名,域名计数

代码如下:
Mapper
package com.keseek.hadoop;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Mapper;

public class DomainCountMapper implements
        Mapper
<LongWritable, Text, Text, LongWritable> {

    @Override
   
public void configure(JobConf arg0) {
       
// Init Text and LongWritable
        domain = new Text();
        one
= new LongWritable(1);
    }


    @Override
   
public void close() throws IOException {
       
// TODO Auto-generated method stub
    }


    @Override
   
public void map(LongWritable key, Text value,
            OutputCollector
<Text, LongWritable> output, Reporter reporter)
           
throws IOException {
       
// Get URL
        String url = value.toString().trim();

       
// URL->Domain && Collect
        domain.set(ParseDomain(url));
       
if (domain.getLength() != 0) {
            output.collect(domain, one);
        }


    }


   
public String ParseDomain(String url) {
       
try {
            URI uri
= URI.create(url);
           
return uri.getHost();
        }
catch (Exception e) {
           
return "";
        }

    }


   
// Shared used Text domain
    private Text domain;

   
// One static
    private LongWritable one;

}

Reducer

package com.keseek.hadoop;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Reducer;

public class DomainCountReducer implements
        Reducer
<Text, LongWritable, Text, LongWritable> {

    @Override
   
public void configure(JobConf arg0) {
       
// TODO Auto-generated method stub

    }


    @Override
   
public void close() throws IOException {
       
// TODO Auto-generated method stub

    }


    @Override
   
public void reduce(Text key, Iterator<LongWritable> values,
            OutputCollector
<Text, LongWritable> output, Reporter reporter)
           
throws IOException {
       
// Count the domain
        long cnt = 0;
       
while (values.hasNext()) {
            cnt
+= values.next().get();
        }

       
// Output
        output.collect(key, new LongWritable(cnt));
    }


}

Main

package com.keseek.hadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class DomainCountMain {
   
public static void main(String[] args) throws Exception {
       
// Param for path
        if (args.length != 2) {
            System.out.println(
"Usage:");
            System.out
                    .println(
"DomainCountMain.jar  <Input_Path>  <Outpu_Path>");
            System.exit(
-1);
        }


       
// Configure JobConf
        JobConf jobconf = new JobConf(DomainCountMain.class);

        jobconf.setJobName(
"Domain Counter by Coder4");

        FileInputFormat.setInputPaths(jobconf,
new Path(args[0]));
       FileOutputFormat.setOutputPath(jobconf,
new Path(args[1]));

        jobconf.setInputFormat(TextInputFormat.
class);
       jobconf.setOutputFormat(TextOutputFormat.
class);

        jobconf.setMapperClass(DomainCountMapper.
class);
        jobconf.setReducerClass(DomainCountReducer.
class);
       jobconf.setCombinerClass(DomainCountReducer.
class);

        jobconf.setMapOutputKeyClass(Text.
class);
        jobconf.setMapOutputValueClass(LongWritable.
class);
        jobconf.setOutputKeyClass(Text.
class);
        jobconf.setOutputValueClass(LongWritable.
class);

       
// Run job
        RunningJob run = JobClient.runJob(jobconf);
        run.waitForCompletion();
       
if (run.isSuccessful()) {
            System.out.println(
"<<<DomainCount Main>>> success.");
        }
else {
            System.out.println(
"<<<DomainCount Main>>> error.");
        }

    }

}

posted on 2012-09-08 15:30 paulwong 阅读(258) 评论(0)  编辑  收藏 所属分类: HADOOP云计算


只有注册用户登录后才能发表评论。


网站导航: