http://jeffxie.blog.51cto.com/1365360/305538
我在
Hadoop的用户邮件列表中看到一些国内的用 户在讯问一些关于如何操作的
HBase的
问题,还看到了HBase中没有Example。觉得有 必要跟大家
分享自己的经验。
 在下面的例子中我们分析
Apache的log并把这些log进行分析并把分析完 的结果按用户IP为ROW,把log中用户的访问时间,请求方法,用户请求的协议,用户的浏览器,服务状态等写到HBase的表中。  
 首先我们要在HBase中建立我们的一个表来存储
数据。  
- public static void  creatTable(String table) throws IOException{
 
-             HConnection conn =  HConnectionManager.getConnection(conf);
 
-             HBaseAdmin admin = new HBaseAdmin(conf);
 
-             if(!admin.tableExists(new Text(table))){
 
-               System.out.println("1. " + table + " table  creating ... please wait");
 
-               HTableDescriptor tableDesc = new  HTableDescriptor(table);
 
-               tableDesc.addFamily(new  HColumnDescriptor("http:"));
 
-               tableDesc.addFamily(new  HColumnDescriptor("url:"));
 
-               tableDesc.addFamily(new  HColumnDescriptor("referrer:"));
 
-               admin.createTable(tableDesc);
 
-             } else {
 
-               System.out.println("1. " + table + " table  already exists.");
 
-             }
 
-             System.out.println("2. access_log files fetching using  map/reduce");
 
-   }
 然后我们
运行一个MapReduce任务来取得log中的每一行 数据。因为我们只要取得数据而不需要对结果进行规约,我们只要编写一个Map
程序即可。    
- public static class  MapClass extends MapReduceBase implements
 
-       Mapper<WritableComparable, Text, Text, Writable> {
 
-  
 
-     @Override
 
-     public void configure(JobConf job) {
 
-       tableName = job.get(TABLE, "");
 
-     }
 
-  
 
-     public void map(WritableComparable key, Text value,
 
-         OutputCollector<Text, Writable> output, Reporter  reporter)
 
-         throws IOException {
 
-       try {
 
-              AccessLogParser log = new  AccessLogParser(value.toString());
 
-         if(table==null)
 
-                 table = new HTable(conf, new Text(tableName));
 
-         long lockId = table.startUpdate(new Text(log.getIp()));
 
-         table.put(lockId, new Text("http:protocol"),  log.getProtocol().getBytes());
 
-         table.put(lockId, new Text("http:method"),  log.getMethod().getBytes());
 
-         table.put(lockId, new Text("http:code"),  log.getCode().getBytes());
 
-         table.put(lockId, new Text("http:bytesize"),  log.getByteSize().getBytes());
 
-         table.put(lockId, new Text("http:agent"),  log.getAgent().getBytes());
 
-         table.put(lockId, new Text("url:" + log.getUrl()),  log.getReferrer().getBytes());
 
-         table.put(lockId, new Text("referrer:" +  log.getReferrer()), log.getUrl().getBytes());
 
-  
 
-         table.commit(lockId, log.getTimestamp());
 
-       } catch (ParseException e) {
 
-         e.printStackTrace();
 
-       } catch (Exception e) {
 
-         e.printStackTrace();
 
-       }
 
-     }
 
-   }
 我们在Map程序中对于传进来的每一行先交给AccessLogParser去处理在AccessLogParser德构造器中用一个正则表达式"([^  ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\] \"([^\"]*)\" " ([^ ]*) ([^ ]*)  \"([^\"]*)\"  \"([^\"]*)\".*"来匹配每一行的log。接下来我们把这些AccessLogParser处理出来的结果更新到HBase的表中去,好的, 我们的程序写完了。我们要启动一个MapReduce的话我们要对工作进行配置。    
- public static void  runMapReduce(String table,String dir) throws IOException{
 
-           Path tempDir = new Path("log/temp");
 
-           Path InputDir = new Path(dir);
 
-           FileSystem fs = FileSystem.get(conf);
 
-           JobConf jobConf = new JobConf(conf,  LogFetcher.class);
 
-           jobConf.setJobName("apache log fetcher");
 
-           jobConf.set(TABLE, table);
 
-           Path[] in = fs.listPaths(InputDir);
 
-           if (fs.isFile(InputDir)) {
 
-               jobConf.setInputPath(InputDir);
 
-           } else {
 
-               for (int i = 0; i < in.length; i++) {
 
-                 if (fs.isFile(in[i])) {
 
-                   jobConf.addInputPath(in[i]);
 
-                 } else {
 
-                   Path[] sub = fs.listPaths(in[i]);
 
-                   for (int j = 0; j < sub.length; j++) {
 
-                     if (fs.isFile(sub[j])) {
 
-                       jobConf.addInputPath(sub[j]);
 
-                     }
 
-                   }
 
-                 }
 
-               }
 
-             }
 
-             jobConf.setOutputPath(tempDir);
 
-             jobConf.setMapperClass(MapClass.class);
 
-  
 
-             JobClient client = new JobClient(jobConf);
 
-             ClusterStatus cluster =  client.getClusterStatus();
 
-             jobConf.setNumMapTasks(cluster.getMapTasks());
 
-             jobConf.setNumReduceTasks(0);
 
-  
 
-             JobClient.runJob(jobConf);
 
-             fs.delete(tempDir);
 
-             fs.close();
 
-   }
 在上面的代码中我们先产生一个jobConf对象,然后设定我们的InputPath和OutputPath,告诉MapReduce我们的Map类,设 定我们用多少个Map任务和Reduce任务,然后我们不任务提交给JobClient,关于MapReduce跟详细的
资料在
Hadoop  Wiki上。 
下载:源码和已编译好的
jar文件
example-src.tgz 例子的运行命令是: 
 bin/hadoop jar examples.jar logfetcher <access_log file or  directory> <table_name> 
 如何运行上面的应用程序呢?我们假定解压缩完Hadoop分发包的目录为%HADOOP%
 拷贝%HADOOP%\contrib\hbase\bin下的文件到%HADOOP%\bin下,拷贝%HADOOP%\contrib\hbase \conf的文件到%HADOOP%\conf下,拷贝%HADOOP%\src\contrib\hbase\lib的文件到%HADOOP%\lib 下,拷贝%HADOOP%\src\contrib\hbase\hadoop-*-hbase.jar的文件到%HADOOP%\lib下.然后编辑配 置文件hbase-site.xml设定你的hbase.master例子:192.168.2.92:60000。把这些文件分发到运行Hadoop的 机器上去。在regionservers文件添加上这些已分发过的地址。运行bin/start-hbase.sh命令启动HBase,把你的 apache log文件拷贝到HDFS的apache-log目录下,等启动完成后运行下面的命令。 
 bin/hadoop jar examples.jar logfetcher apache-log apache 
 访问
http://localhost:50030/能 看到你的MapReduce任务的运行情况,访问
http://localhost:60010/能 看到HBase的运行情况。   
 
    等任务MapReduce完成后访问
http://localhost:60010/hql.jsp,在Query输入框中输入 SELECT * FROM apache limit=50;。将会看到已经插入表中的数据。  

posted on 2013-02-22 14:12 
SIMONE 阅读(2492) 
评论(0)  编辑  收藏  所属分类: 
hbase