统计域名(实际是host)的计数器。
输入:一个文件夹中有一堆的文本文件,内容是一行一个的url,可以想像为数据库中的一条记录
流程:提取url的domain,对domain计数+1
输出:域名,域名计数
代码如下:
Mapper
package com.keseek.hadoop;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Mapper;

public class DomainCountMapper implements

        Mapper<LongWritable, Text, Text, LongWritable> 
{

    @Override

    public void configure(JobConf arg0) 
{
        // Init Text and LongWritable
        domain = new Text();
        one = new LongWritable(1);
    }

    @Override

    public void close() throws IOException 
{
        // TODO Auto-generated method stub
    }

    @Override
    public void map(LongWritable key, Text value,
            OutputCollector<Text, LongWritable> output, Reporter reporter)

            throws IOException 
{
        // Get URL
        String url = value.toString().trim();

        // URL->Domain && Collect
        domain.set(ParseDomain(url));

        if (domain.getLength() != 0) 
{
            output.collect(domain, one);
        }

    }


    public String ParseDomain(String url) 
{

        try 
{
            URI uri = URI.create(url);
            return uri.getHost();

        } catch (Exception e) 
{
            return "";
        }
    }

    // Shared used Text domain
    private Text domain;

    // One static
    private LongWritable one;

}Reducer
package com.keseek.hadoop;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Reducer;

public class DomainCountReducer implements

        Reducer<Text, LongWritable, Text, LongWritable> 
{

    @Override

    public void configure(JobConf arg0) 
{
        // TODO Auto-generated method stub

    }

    @Override

    public void close() throws IOException 
{
        // TODO Auto-generated method stub

    }

    @Override
    public void reduce(Text key, Iterator<LongWritable> values,
            OutputCollector<Text, LongWritable> output, Reporter reporter)

            throws IOException 
{
        // Count the domain
        long cnt = 0;

        while (values.hasNext()) 
{
            cnt += values.next().get();
        }
        // Output
        output.collect(key, new LongWritable(cnt));
    }

}Main
package com.keseek.hadoop;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;


public class DomainCountMain 
{

    public static void main(String[] args) throws Exception 
{
        // Param for path

        if (args.length != 2) 
{
            System.out.println("Usage:");
            System.out
                    .println("DomainCountMain.jar  <Input_Path>  <Outpu_Path>");
            System.exit(-1);
        }

        // Configure JobConf
        JobConf jobconf = new JobConf(DomainCountMain.class);

        jobconf.setJobName("Domain Counter by Coder4");

        FileInputFormat.setInputPaths(jobconf, new Path(args[0]));
              FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));

        jobconf.setInputFormat(TextInputFormat.class);
              jobconf.setOutputFormat(TextOutputFormat.class);

        jobconf.setMapperClass(DomainCountMapper.class);
        jobconf.setReducerClass(DomainCountReducer.class);
              jobconf.setCombinerClass(DomainCountReducer.class);

        jobconf.setMapOutputKeyClass(Text.class);
        jobconf.setMapOutputValueClass(LongWritable.class);
        jobconf.setOutputKeyClass(Text.class);
        jobconf.setOutputValueClass(LongWritable.class);

        // Run job
        RunningJob run = JobClient.runJob(jobconf);
        run.waitForCompletion();

        if (run.isSuccessful()) 
{
            System.out.println("<<<DomainCount Main>>> success.");

        } else 
{
            System.out.println("<<<DomainCount Main>>> error.");
        }
    }
}