随笔-311  评论-209  文章-0  trackbacks-0
  2018年9月3日
Sentry权限控制通过Beeline(Hiveserver2 SQL 命令行接口)输入Grant 和 Revoke语句来配置。语法跟现在的一些主流的关系数据库很相似。需要注意的是:当sentry服务启用后,我们必须使用beeline接口来执行hive查询,Hive Cli并不支持sentry。 CREATE ROLE Statement CREATE ROLE语句创建一个可以被赋权的角色。权限可以赋给角色,然后再分配给各个用户。一个用户被分配到角色后可以执行该角色的权限。 只有拥有管理员的角色可以create/drop角色。默认情况下,hive、impala和hue用户拥有管理员角色。 CREATE ROLE [role_name]; DROP ROLE Statement DROP ROLE语句可以用来从数据库中移除一个角色。一旦移除,之前分配给所有用户的该角色将会取消。之前已经执行的语句不会受到影响。但是,因为hive在执行每条查询语句之前会检查用户的权限,处于登录活跃状态的用户会话会受到影响。 DROP ROLE [role_name]; GRANT ROLE Statement GRANT ROLE语句可以用来给组授予角色。只有sentry的管理员用户才能执行该操作。 GRANT ROLE role_name [, role_name] TO GROUP (groupName) [,GROUP (groupName)] REVOKE ROLE Statement REVOKE ROLE语句可以用来从组移除角色。只有sentry的管理员用户才能执行该操作。 REVOKE ROLE role_name [, role_name] FROM GROUP (groupName) [,GROUP (groupName)] GRANT (PRIVILEGE) Statement 授予一个对象的权限给一个角色,该用户必须为sentry的管理员用户。 GRANT (PRIVILEGE) [, (PRIVILEGE) ] ON (OBJECT) (object_name) TO ROLE (roleName) [,ROLE (roleName)] REVOKE (PRIVILEGE) Statement 因为只有认证的管理员用户可以创建角色,从而只有管理员用户可以取消一个组的权限。 REVOKE (PRIVILEGE) [, (PRIVILEGE) ] ON (OBJECT) (object_name) FROM ROLE (roleName) [,ROLE (roleName)] GRANT (PRIVILEGE) ... WITH GRANT OPTION 在cdh5.2中,你可以委托给其他角色来授予和解除权限。比如,一个角色被授予了WITH GRANT OPTION的权限可以GRANT/REVOKE同样的权限给其他角色。因此,如果一个角色有一个库的所有权限并且设置了 WITH GRANT OPTION,该角色分配的用户可以对该数据库和其中的表执行GRANT/REVOKE语句。 GRANT (PRIVILEGE) ON (OBJECT) (object_name) TO ROLE (roleName) WITH GRANT OPTION 只有一个带GRANT选项的特殊权限的角色或者它的父级权限可以从其他角色解除这种权限。一旦下面的语句执行,所有跟其相关的grant权限将会被解除。 REVOKE (RIVILEGE) ON (BJECT) (bject_name) FROM ROLE (roleName) Hive目前不支持解除之前赋予一个角色 WITH GRANT OPTION 的权限。要想移除WITH GRANT OPTION、解除权限,可以重新去除 WITH GRANT OPTION这个标记来再次附权。 SET ROLE Statement SET ROLE语句可以给当前会话选择一个角色使之生效。一个用户只能启用分配给他的角色。任何不存在的角色和当前用户不能使用的角色是不能生效的。如果没有使用任何角色,用户将会使用任何一个属于他的角色的权限。 选择一个角色使用: To enable a specific role: 使用所有的角色: To enable a specific role: 关闭所有角色 SET ROLE NONE; SHOW Statement 显示当前用户拥有库、表、列相关权限的数据库: SHOW DATABASES; 显示当前用户拥有表、列相关权限的表; SHOW TABLES; 显示当前用户拥有SELECT权限的列: SHOW COLUMNS (FROM|IN) table_name [(FROM|IN) db_name]; 显示当前系统中所有的角色(只有管理员用户可以执行): SHOW ROLES; 显示当前影响当前会话的角色: SHOW CURRENT ROLES; 显示指定组的被分配到的所有角色(只有管理员用户和指定组内的用户可以执行) SHOW ROLE GRANT GROUP (groupName); SHOW语句可以用来显示一个角色被授予的权限或者显示角色的一个特定对象的所有权限。 显示指定角色的所有被赋予的权限。(只有管理员用户和指定角色分配到的用户可以执行)。下面的语句也会显示任何列级的权限。 SHOW GRANT ROLE (roleName); 显示指定对象的一个角色的所有被赋予的权限(只有管理员用户和指定角色分配到的用户可以执行)。下面的语句也会显示任何列级的权限。 SHOW GRANT ROLE (roleName) on (OBJECT) (objectName); ----------------------------我也是有底线的-----------------------------
posted @ 2018-09-03 18:19 xzc 阅读(37) | 评论 (0)编辑 收藏
  2018年5月18日
     摘要: Python 里面的编码和解码也就是 unicode 和 str 这两种形式的相互转化。编码是 unicode -> str,相反的,解码就是 str -> unicode。剩下的问题就是确定何时需要进行编码或者解码了.关于文件开头的"编码指示",也就是 # -*- codin...  阅读全文
posted @ 2018-05-18 09:52 xzc 阅读(73) | 评论 (0)编辑 收藏
  2018年3月8日
一、前言
    早上醒来打开微信,同事反馈kafka集群从昨天凌晨开始写入频繁失败,赶紧打开电脑查看了kafka集群的机器监控,日志信息,发现其中一个节点的集群负载从昨天凌晨突然掉下来了,和同事反馈的时间点大概一致,于是乎就登录服务器开始干活。
二、排错
1、查看机器监控,看是否能大概定位是哪个节点有异常
技术分享
2、根据机器监控大概定位到其中一个异常节点,登录服务器查看kafka日志,发现有报错日志,并且日志就停留在这个这个时间点:
[2017-06-01 16:59:59,851] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:658)
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
        at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
        at sun.nio.ch.IOUtil.read(IOUtil.java:195)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
        at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:108)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at kafka.network.Processor.run(SocketServer.scala:413)3、查看kafka进程和监听端口情况,发现都正常,尼玛假死了
ps -ef |grep kafka        ## 查看kafka的进程
netstat -ntlp |grep 9092  ##9092kafka的监听端口4、既然已经假死了,只能重启了
ps -ef |grep kafka |grep -v grep |awk ‘{print $2}‘  | xargs kill -9  
/usr/local/kafka/bin;nohup ./kafka-server-start.sh ../config/server.properties &5、重启后在观察该节点的kafka日志,在一顿index重建之后,上面的报错信息在疯狂的刷,最后谷歌一番,解决了该问题
三、解决方案:
/usr/local/kafka/binkafka-run-class.sh去掉
-XX:+DisableExplicitGC添加
-XX:MaxDirectMemorySize=512m在一次重启kafka,问题解决。
posted @ 2018-03-08 16:35 xzc 阅读(455) | 评论 (0)编辑 收藏
  2018年3月7日
     摘要: 我们每次执行hive的hql时,shell里都会提示一段话:[python] view plaincopy...  Number of reduce tasks not specified. Estimated from input data size: 50...  阅读全文
posted @ 2018-03-07 11:21 xzc 阅读(302) | 评论 (1)编辑 收藏
  2017年10月23日
     摘要: spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数 1  应用场景:   1、我们需要统计用户的总使用时长(累加历史)   2、前台展现页面需要对多个维度进行查询,如:产品、地区等等   3、需要展现的表格头如: 产品、2015-04、2015-05、2015-06 2 原始数据: product_code |event_date |dur...  阅读全文
posted @ 2017-10-23 22:05 xzc 阅读(238) | 评论 (0)编辑 收藏
     摘要: Spark1.4发布,支持了窗口分析函数(window functions)。在离线平台中,90%以上的离线分析任务都是使用Hive实现,其中必然会使用很多窗口分析函数,如果SparkSQL支持窗口分析函数, 那么对于后面Hive向SparkSQL中的迁移的工作量会大大降低,使用方式如下: 1、初始化数据 创建表 [sql] view plain cop...  阅读全文
posted @ 2017-10-23 22:04 xzc 阅读(116) | 评论 (0)编辑 收藏

SparkSQL相关语句总结

1.in 不支持子查询 eg. select * from src where key in(select key from test);
支持查询个数 eg. select * from src where key in(1,2,3,4,5);
in 40000个 耗时25.766秒
in 80000个 耗时78.827秒

2.union all/union
不支持顶层的union all eg. select key from src UNION ALL select key from test;
支持select * from (select key from src union all select key from test)aa;
不支持 union
支持select distinct key from (select key from src union all select key from test)aa;

3.intersect 不支持

4.minus 不支持

5.except 不支持

6.inner join/join/left outer join/right outer join/full outer join/left semi join 都支持
left outer join/right outer join/full outer join 中间必须有outer
join是最简单的关联操作,两边关联只取交集;
left outer join是以左表驱动,右表不存在的key均赋值为null;
right outer join是以右表驱动,左表不存在的key均赋值为null;
full outer join全表关联,将两表完整的进行笛卡尔积操作,左右表均可赋值为null;
left semi join最主要的使用场景就是解决exist in;
Hive不支持where子句中的子查询,SQL常用的exist in子句在Hive中是不支持的
不支持子查询 eg. select * from src aa where aa.key in(select bb.key from test bb);
可用以下两种方式替换:
select * from src aa left outer join test bb on aa.key=bb.key where bb.key <> null;
select * from src aa left semi join test bb on aa.key=bb.key;
大多数情况下 JOIN ON 和 left semi on 是对等的
A,B两表连接,如果B表存在重复数据
当使用JOIN ON的时候,A,B表会关联出两条记录,应为ON上的条件符合; 
而是用LEFT SEMI JOIN 当A表中的记录,在B表上产生符合条件之后就返回,不会再继续查找B表记录了,
所以如果B表有重复,也不会产生重复的多条记录。 
left outer join 支持子查询 eg. select aa.* from src aa left outer join (select * from test111)bb on aa.key=bb.a;

7. hive四中数据导入方式
1)从本地文件系统中导入数据到Hive表
create table wyp(id int,name string) ROW FORMAT delimited fields terminated by '\t' STORED AS TEXTFILE;
load data local inpath 'wyp.txt' into table wyp;
2)从HDFS上导入数据到Hive表
[wyp@master /home/q/hadoop-2.2.0]$ bin/hadoop fs -cat /home/wyp/add.txt
hive> load data inpath '/home/wyp/add.txt' into table wyp;
3)从别的表中查询出相应的数据并导入到Hive表中
hive> create table test(
> id int, name string
> ,tel string)
> partitioned by
> (age int)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY '\t'
> STORED AS TEXTFILE;

注:test表里面用age作为了分区字段,分区:在Hive中,表的每一个分区对应表下的相应目录,所有分区的数据都是存储在对应的目录中。
比如wyp表有dt和city两个分区,则对应dt=20131218city=BJ对应表的目录为/user/hive/warehouse/dt=20131218/city=BJ,
所有属于这个分区的数据都存放在这个目录中。

hive> insert into table test
> partition (age='25')
> select id, name, tel
> from wyp;

也可以在select语句里面通过使用分区值来动态指明分区:
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> insert into table test
> partition (age)
> select id, name,
> tel, age
> from wyp;

Hive也支持insert overwrite方式来插入数据
hive> insert overwrite table test
> PARTITION (age)
> select id, name, tel, age
> from wyp;

Hive还支持多表插入
hive> from wyp
> insert into table test
> partition(age)
> select id, name, tel, age
> insert into table test3
> select id, name
> where age>25;
4)在创建表的时候通过从别的表中查询出相应的记录并插入到所创建的表中
hive> create table test4
> as
> select id, name, tel
> from wyp;

8.查看建表语句
hive> show create table test3;

9.表重命名
hive> ALTER TABLE events RENAME TO 3koobecaf; 

10.表增加列
hive> ALTER TABLE pokes ADD COLUMNS (new_col INT); 

11.添加一列并增加列字段注释 
hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment'); 

12.删除表
hive> DROP TABLE pokes; 

13.top n
hive> select * from test order by key limit 10;
14.创建数据库
Create Database baseball;

14.alter table tablename  change oldColumn newColumn column_type 修改列的名称和类型

alter table yangsy CHANGE product_no phone_no string

 

15.导入.sql文件中的sql

 spark-sql --driver-class-path /home/hadoop/hive/lib/mysql-connector-java-5.1.30-bin.jar -f testsql.sql 


insert into table CI_CUSER_20141117154351522 select mainResult.PRODUCT_NO,dw_coclbl_m02_3848.L1_01_02_01,dw_coclbl_d01_3845.L2_01_01_04 from (select PRODUCT_NO from CI_CUSER_20141114203632267) mainResult left join DW_COCLBL_M02_201407 dw_coclbl_m02_3848 on mainResult.PRODUCT_NO = dw_coclbl_m02_3848.PRODUCT_NO left join DW_COCLBL_D01_20140515 dw_coclbl_d01_3845 on dw_coclbl_m02_3848.PRODUCT_NO = dw_coclbl_d01_3845.PRODUCT_NO

insert into CI_CUSER_20141117142123638 ( PRODUCT_NO,ATTR_COL_0000,ATTR_COL_0001) select mainResult.PRODUCT_NO,dw_coclbl_m02_3848.L1_01_02_01,dw_coclbl_m02_3848.L1_01_03_01 from (select PRODUCT_NO from CI_CUSER_20141114203632267) mainResult left join DW_COCLBL_M02_201407 dw_coclbl_m02_3848 on mainResult.PRODUCT_NO = dw_coclbl_m02_3848.PRODUCT_NO 


CREATE TABLE ci_cuser_yymmddhhmisstttttt_tmp(product_no string) row format serde 'com.bizo.hive.serde.csv.CSVSerde' ; 
LOAD DATA LOCAL INPATH '/home/ocdc/coc/yuli/test123.csv' OVERWRITE INTO TABLE test_yuli2;

创建支持CSV格式的testfile文件
CREATE TABLE test_yuli7 row format serde 'com.bizo.hive.serde.csv.CSVSerde' as select * from CI_CUSER_20150310162729786;

不依赖CSVSerde的jar包创建逗号分隔的表
"create table " +listName+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" +
" as select * from " + listName1;

create table aaaa ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE as select * from

ThriftServer 开启FAIR模式
SparkSQL Thrift Server 开启FAIR调度方式:
1. 修改$SPARK_HOME/conf/spark-defaults.conf,新增
2. spark.scheduler.mode FAIR
3. spark.scheduler.allocation.file /Users/tianyi/github/community/apache-spark/conf/fair-scheduler.xml
4. 修改$SPARK_HOME/conf/fair-scheduler.xml(或新增该文件), 编辑如下格式内容
5. <?xml version="1.0"?>
6. <allocations>
7. <pool name="production">
8. <schedulingMode>FAIR</schedulingMode>
9. <!-- weight表示两个队列在minShare相同的情况下,可以使用资源的比例 -->
10. <weight>1</weight>
11. <!-- minShare表示优先保证的资源数 -->
12. <minShare>2</minShare>
13. </pool>
14. <pool name="test">
15. <schedulingMode>FIFO</schedulingMode>
16. <weight>2</weight>
17. <minShare>3</minShare>
18. </pool>
19. </allocations>
20. 重启Thrift Server
21. 执行SQL前,执行 
22. set spark.sql.thriftserver.scheduler.pool=指定的队列名

等操作完了 create table yangsy555 like CI_CUSER_YYMMDDHHMISSTTTTTT 然后insert into yangsy555 select * from yangsy555

 

创建一个自增序列表,使用row_number() over()为表增加序列号 以供分页查询

create table yagnsytest2 as SELECT ROW_NUMBER() OVER() as id,* from yangsytest;

 

 

Sparksql的解析与Hiveql的解析的执行流程:

posted @ 2017-10-23 21:03 xzc 阅读(277) | 评论 (0)编辑 收藏
  2017年9月14日
如果用传统SCP远程拷贝,速度是比较慢的。现在采用lz4压缩传输。LZ4是一个非常快的无损压缩算法,压缩速度在单核300MB/S,可扩展支持多核CPU。它还具有一个非常快速的解码器,速度单核可达到和超越1GB/S。通常能够达到多核系统上的RAM速度限制。 你PV 全命为Pipe Viewer,利用它我们可以查看到命令执行的进度。 下面介绍下lz4和pv的安装,下载软件: 下载pv-1.1.4.tar.gz wget http://sourceforge.jp/projects/sfnet_pipeviewer/downloads/pipeviewer/1.1.4/pv-1.1.4.tar.bz2/ 下lz4的包难一些,可能要FQ:https://dl.dropboxusercontent.com/u/59565338/LZ4/lz4-r108.tar.gz 安装灰常简单: pv安装: [root ~]$ tar jxvf pv-1.1.4.tar.bz2 [root ~]$ cd pv-1.1.4 [root pv-1.1.4]$ ./configure && make && make install lz4安装: [root ~]$ tar zxvf lz4-r108.tar.gz [root ~]$ cd lz4-r108 [root lz4-r108]$ make && make install 用法:(-c 后指定要传输的文件,ssh -p 是指定端口,后面的ip是目标主机的ip, -xC指定传到目标主机下的那个目录下,别的不用修改): tar -c mysql-slave-3307 |pv|lz4 -B4|ssh -p10022 -c arcfour128 -o"MACs umac-64@openssh.com" 192.168.100.234 "lz4 -d |tar -xC /data" 下面是我线上传一个从库的效果: 看到了吧,25.7G 只需要接近3分钟,这样远比scp速度快上了好几倍,直接scp拷贝离散文件,很消耗IO,而使用LZ4快速压缩,对性能影响不大,传输速度快 PS:下次补充同机房不同网段的传输效果及跨机房的传输效果^0^ 作者:陆炫志 出处:xuanzhi的博客 http://www.cnblogs.com/xuanzhi201111 您的支持是对博主最大的鼓励,感谢您的认真阅读。本文版权归作者所有,欢迎转载,但请保留该声明。
posted @ 2017-09-14 18:24 xzc 阅读(179) | 评论 (0)编辑 收藏
王 腾腾 和 邵 兵 2015 年 11 月 26 日发布 WeiboGoogle+用电子邮件发送本页面 Comments 1 引子 随着云时代的来临,大数据(Big data)也获得了越来越多的关注。著云台的分析师团队认为,大数据(Big data)通常用来形容一个公司创造的大量非结构化和半结构化数据,这些数据在下载到关系型数据库用于分析时会花费过多时间和金钱。大数据分析常和云计算联系到一起,因为实时的大型数据集分析需要像 MapReduce 一样的框架来向数十、数百或甚至数千的电脑分配工作。 “大数据”在互联网行业指的是这样一种现象:互联网公司在日常运营中生成、累积的用户网络行为数据。这些数据的规模是如此庞大,以至于不能用 G 或 T 来衡量。所以如何高效的处理分析大数据的问题摆在了面前。对于大数据的处理优化方式有很多种,本文中主要介绍在使用 Hadoop 平台中对数据进行压缩处理来提高数据处理效率。 压缩简介 Hadoop 作为一个较通用的海量数据处理平台,每次运算都会需要处理大量数据,我们会在 Hadoop 系统中对数据进行压缩处理来优化磁盘使用率,提高数据在磁盘和网络中的传输速度,从而提高系统处理数据的效率。在使用压缩方式方面,主要考虑压缩速度和压缩文件的可分割性。综合所述,使用压缩的优点如下: 1. 节省数据占用的磁盘空间; 2. 加快数据在磁盘和网络中的传输速度,从而提高系统的处理速度。 压缩格式 Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。 Hadoop 对每个压缩格式的支持, 详细见下表: 表 1. 压缩格式 压缩格式 工具 算法 扩展名 多文件 可分割性 DEFLATE 无 DEFLATE .deflate 不 不 GZIP gzip DEFLATE .gzp 不 不 ZIP zip DEFLATE .zip 是 是,在文件范围内 BZIP2 bzip2 BZIP2 .bz2 不 是 LZO lzop LZO .lzo 不 是 如果压缩的文件没有扩展名,则需要在执行 MapReduce 任务的时候指定输入格式。 1 2 3 4 5 hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/ hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper / usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/ reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 - jobconf mapred.reduce.tasks=1*-inputformatorg.apache.hadoop.mapred.LzoTextInputFormat* 性能对比 Hadoop 下各种压缩算法的压缩比,压缩时间,解压时间见下表: 表 2. 性能对比 压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度 gzip 8.3GB 1.8GB 17.5MB/s 58MB/s bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s 因此我们可以得出: 1) Bzip2 压缩效果明显是最好的,但是 bzip2 压缩速度慢,可分割。 2) Gzip 压缩效果不如 Bzip2,但是压缩解压速度快,不支持分割。 3) LZO 压缩效果不如 Bzip2 和 Gzip,但是压缩解压速度最快!并且支持分割! 这里提一下,文件的可分割性在 Hadoop 中是很非常重要的,它会影响到在执行作业时 Map 启动的个数,从而会影响到作业的执行效率! 所有的压缩算法都显示出一种时间空间的权衡,更快的压缩和解压速度通常会耗费更多的空间。在选择使用哪种压缩格式时,我们应该根据自身的业务需求来选择。 下图是在本地压缩与通过流将压缩结果上传到 BI 的时间对比。 图 1. 时间对比 图 1. 时间对比 使用方式 MapReduce 可以在三个阶段中使用压缩。 1. 输入压缩文件。如果输入的文件是压缩过的,那么在被 MapReduce 读取时,它们会被自动解压。 2.MapReduce 作业中,对 Map 输出的中间结果集压缩。实现方式如下: 1)可以在 core-site.xml 文件中配置,代码如下 图 2. core-site.xml 代码示例 图 2. core-site.xml 代码示例 2)使用 Java 代码指定 1 2 conf.setCompressMapOut(true); conf.setMapOutputCompressorClass(GzipCode.class); 最后一行代码指定 Map 输出结果的编码器。 3.MapReduce 作业中,对 Reduce 输出的最终结果集压。实现方式如下: 1)可以在 core-site.xml 文件中配置,代码如下 图 3. core-site.xml 代码示例 图 3. core-site.xml 代码示例 2)使用 Java 代码指定 1 2 conf.setBoolean(“mapred.output.compress”,true); conf.setClass(“mapred.output.compression.codec”,GzipCode.class,CompressionCodec.class); 最后一行同样指定 Reduce 输出结果的编码器。 压缩框架 我们前面已经提到过关于压缩的使用方式,其中第一种就是将压缩文件直接作为入口参数交给 MapReduce 处理,MapReduce 会自动根据压缩文件的扩展名来自动选择合适解压器处理数据。那么到底是怎么实现的呢?如下图所示: 图 4. 压缩实现情形 图 4. 压缩实现情形 我们在配置 Job 作业的时候,会设置数据输入的格式化方式,使用 conf.setInputFormat() 方法,这里的入口参数是 TextInputFormat.class。 TextInputFormat.class 继承于 InputFormat.class,主要用于对数据进行两方面的预处理。一是对输入数据进行切分,生成一组 split,一个 split 会分发给一个 mapper 进行处理;二是针对每个 split,再创建一个 RecordReader 读取 split 内的数据,并按照的形式组织成一条 record 传给 map 函数进行处理。此类在对数据进行切分之前,会首先初始化压缩解压工程类 CompressionCodeFactory.class,通过工厂获取实例化的编码解码器 CompressionCodec 后对数据处理操作。 下面我们来详细的看一下从压缩工厂获取编码解码器的过程。 压缩解压工厂类 CompressionCodecFactory 压缩解压工厂类 CompressionCodeFactory.class 主要功能就是负责根据不同的文件扩展名来自动获取相对应的压缩解压器 CompressionCodec.class,是整个压缩框架的核心控制器。我们来看下 CompressionCodeFactory.class 中的几个重要方法: 1. 初始化方法 图 5. 代码示例 图 5. 代码示例 ① getCodeClasses(conf) 负责获取关于编码解码器 CompressionCodec.class 的配置信息。下面将会详细讲解。 ② 默认添加两种编码解码器。当 getCodeClass(conf) 方法没有读取到相关的编码解码器 CompressionCodec.class 的配置信息时,系统会默认添加两种编码解码器 CompressionCodec.class,分别是 GzipCode.class 和 DefaultCode.class。 ③ addCode(code) 此方法用于将编码解码器 CompressionCodec.class 添加到系统缓存中。下面将会详细讲解。 2. getCodeClasses(conf) 图 6. 代码示例 图 6. 代码示例 ① 这里我们可以看,系统读取关于编码解码器 CompressionCodec.class 的配置信息在 core-site.xml 中 io.compression.codes 下。我们看下这段配置文件,如下图所示: 图 7. 代码示例 图 7. 代码示例 Value 标签中是每个编码解码 CompressionCodec.class 的完整路径,中间用逗号分隔。我们只需要将自己需要使用到的编码解码配置到此属性中,系统就会自动加载到缓存中。 除了上述的这种方式以外,Hadoop 为我们提供了另一种加载方式:代码加载。同样最终将信息配置在 io.compression.codes 属性中,代码如下: 1 2 conf.set("io.compression.codecs","org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");) 3. addCode(code) 方法添加编码解码器 图 8. 代码示例 图 8. 代码示例 addCodec(codec) 方法入口参数是个编码解码器 CompressionCodec.class,这里我们会首先接触到它的一个方法。 ① codec.getDefaultExtension() 方法看方法名的字面意思我们就可以知道,此方法用于获取此编码解码所对应文件的扩展名,比如,文件名是 xxxx.gz2,那么这个方法的返回值就是“.bz2”,我们来看下 org.apache.hadoop.io.compress.BZip2Codec 此方法的实现代码: 图 9. 代码示例 图 9. 代码示例 ② Codecs 是一个 SortedMap 的示例。这里有个很有意思的地方,它将 Key 值,也就是通过 codec.getDefaultExtension() 方法获取到的文件扩展名进行了翻转,举个例子,比如文件名扩展名“.bz2”,将文件名翻转之后就变成了“2zb.”。 系统加载完所有的编码解码器后,我们可以得到这样一个有序映射表,如下: 图 10. 代码示例 图 10. 代码示例 现在编码解码器都有了,我们怎么得到对应的编码解码器呢?看下面这个方法。 4. getCodec() 方法 此方法用于获取文件所对应的的编码解码器 CompressionCodec.class。 图 11. 代码示例 图 11. 代码示例 getCodec(Path) 方法的输入参数是 Path 对象,保存着文件路径。 ① 将文件名翻转。如 xxxx.bz2 翻转成 2zb.xxxx。 ② 获取 codecs 集合中最接近 2zb.xxxx 的值。此方法有返回值同样是个 SortMap 对象。 在这里对返回的 SortMap 对象进行第二次筛选。 编码解码器 CompressionCodec 刚刚在介绍压缩解压工程类 CompressionCodeFactory.class 的时候,我们多次提到了压缩解压器 CompressionCodecclass,并且我们在上文中还提到了它其中的一个用于获取文件扩展名的方法 getDefaultExtension()。 压缩解压工程类 CompressionCodeFactory.class 使用的是抽象工厂的设计模式。它是一个接口,制定了一系列方法,用于创建特定压缩解压算法。下面我们来看下比较重要的几个方法: 1. createOutputStream() 方法对数据流进行压缩。 图 12. 代码示例 图 12. 代码示例 此方法提供了方法重载。 ① 基于流的压缩处理; ② 基于压缩机 Compress.class 的压缩处理 2. createInputStream() 方法对数据流进行解压。 图 13. 代码示例 图 13. 代码示例 这里的解压方法同样提供了方法重载。 ① 基于流的解压处理; ② 基于解压机 Decompressor.class 的解压处理; 关于压缩/解压流与压缩/解压机会在下面的文章中我们会详细讲解。此处暂作了解。 3. getCompressorType() 返回需要的编码器的类型。 getDefaultExtension() 获取对应文件扩展名的方法。前文已提到过,不再敖述。 压缩机 Compressor 和解压机 Decompressor 前面在编码解码器部分的 createInputStream() 和 createInputStream() 方法中我们提到过 Compressor.class 和 Decompressor.class 对象。在 Hadoop 的实现中,数据编码器和解码器被抽象成了两个接口: 1. org.apache.hadoop.io.compress.Compressor; 2. org.apache.hadoop.io.compress.Decompressor; 它们规定了一系列的方法,所以在 Hadoop 内部的编码/解码算法实现都需要实现对应的接口。在实际的数据压缩与解压缩过程,Hadoop 为用户提供了统一的 I/O 流处理模式。 我们看一下压缩机 Compressor.class,代码如下: 图 14. 代码示例 图 14. 代码示例 ① setInput() 方法接收数据到内部缓冲区,可以多次调用; ② needsInput() 方法用于检查缓冲区是否已满。如果是 false 则说明当前的缓冲区已满; ③ getBytesRead() 输入未压缩字节的总数; ④ getBytesWritten() 输出压缩字节的总数; ⑤ finish() 方法结束数据输入的过程; ⑥ finished() 方法用于检查是否已经读取完所有的等待压缩的数据。如果返回 false,表明压缩器中还有未读取的压缩数据,可以继续通过 compress() 方法读取; ⑦ compress() 方法获取压缩后的数据,释放缓冲区空间; ⑧ reset() 方法用于重置压缩器,以处理新的输入数据集合; ⑨ end() 方法用于关闭解压缩器并放弃所有未处理的输入; ⑩ reinit() 方法更进一步允许使用 Hadoop 的配置系统,重置并重新配置压缩器; 为了提高压缩效率,并不是每次用户调用 setInput() 方法,压缩机就会立即工作,所以,为了通知压缩机所有数据已经写入,必须使用 finish() 方法。finish() 调用结束后,压缩机缓冲区中保持的已经压缩的数据,可以继续通过 compress() 方法获得。至于要判断压缩机中是否还有未读取的压缩数据,则需要利用 finished() 方法来判断。 压缩流 CompressionOutputStream 和解压缩流 CompressionInputStream 前文编码解码器部分提到过 createInputStream() 方法返回 CompressionOutputStream 对象,createInputStream() 方法返回 CompressionInputStream 对象。这两个类分别继承自 java.io.OutputStream 和 java.io.InputStream。从而我们不难理解,这两个对象的作用了吧。 我们来看下 CompressionInputStream.class 的代码: 图 15. 代码示例 图 15. 代码示例 可以看到 CompressionOutputStream 实现了 OutputStream 的 close() 方法和 flush() 方法,但用于输出数据的 write() 方法以及用于结束压缩过程并将输入写到底层流的 finish() 方法和重置压缩状态的 resetState() 方法还是抽象方法,需要 CompressionOutputStream 的子类实现。 Hadoop 压缩框架中为我们提供了一个实现了 CompressionOutputStream 类通用的子类 CompressorStream.class。 图 16. 代码示例 图 16. 代码示例 CompressorStream.class 提供了三个不同的构造函数,CompressorStream 需要的底层输出流 out 和压缩时使用的压缩器,都作为参数传入构造函数。另一个参数是 CompressorStream 工作时使用的缓冲区 buffer 的大小,构造时会利用这个参数分配该缓冲区。第一个可以手动设置缓冲区大小,第二个默认 512,第三个没有缓冲区且不可使用压缩器。 图 17. 代码示例 图 17. 代码示例 在 write()、compress()、finish() 以及 resetState() 方法中,我们发现了压缩机 Compressor 的身影,前面文章我们已经介绍过压缩机的的实现过程,通过调用 setInput() 方法将待压缩数据填充到内部缓冲区,然后调用 needsInput() 方法检查缓冲区是否已满,如果缓冲区已满,将调用 compress() 方法对数据进行压缩。流程如下图所示: 图 18. 调用流程图 图 18. 调用流程图 结束语 本文深入到 Hadoop 平台压缩框架内部,对其核心代码以及各压缩格式的效率进行对比分析,以帮助读者在使用 Hadoop 平台时,可以通过对数据进行压缩处理来提高数据处理效率。当再次面临海量数据处理时, Hadoop 平台的压缩机制可以让我们事半功倍。 相关主题 Hadoop 在线 API 《Hadoop 技术内幕深入解析 HADOOP COMMON 和 HDFS 架构设计与实现原理》 developerWorks 开源技术主题:查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。
posted @ 2017-09-14 17:35 xzc 阅读(316) | 评论 (0)编辑 收藏
  2017年9月10日

Linux系统查看当前主机CPU、内存、机器型号及主板信息:


查看CPU信息(型号)
# cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c

 

查看内存信息
# cat /proc/meminfo

 

查看主板型号:
# dmidecode |grep -A16 "System Information$"

 

查看机器型号
# dmidecode | grep "Product Name"

 

查看当前操作系统内核信息
# uname -a

 

查看当前操作系统发行版信息
# cat /etc/issue | grep Linux

posted @ 2017-09-10 16:37 xzc 阅读(82) | 评论 (0)编辑 收藏
仅列出标题  下一页