随笔-307  评论-208  文章-0  trackbacks-0
  2017年10月23日
     摘要: spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数 1  应用场景:   1、我们需要统计用户的总使用时长(累加历史)   2、前台展现页面需要对多个维度进行查询,如:产品、地区等等   3、需要展现的表格头如: 产品、2015-04、2015-05、2015-06 2 原始数据: product_code |event_date |dur...  阅读全文
posted @ 2017-10-23 22:05 xzc 阅读(60) | 评论 (0)编辑 收藏
     摘要: Spark1.4发布,支持了窗口分析函数(window functions)。在离线平台中,90%以上的离线分析任务都是使用Hive实现,其中必然会使用很多窗口分析函数,如果SparkSQL支持窗口分析函数, 那么对于后面Hive向SparkSQL中的迁移的工作量会大大降低,使用方式如下: 1、初始化数据 创建表 [sql] view plain cop...  阅读全文
posted @ 2017-10-23 22:04 xzc 阅读(33) | 评论 (0)编辑 收藏

SparkSQL相关语句总结

1.in 不支持子查询 eg. select * from src where key in(select key from test);
支持查询个数 eg. select * from src where key in(1,2,3,4,5);
in 40000个 耗时25.766秒
in 80000个 耗时78.827秒

2.union all/union
不支持顶层的union all eg. select key from src UNION ALL select key from test;
支持select * from (select key from src union all select key from test)aa;
不支持 union
支持select distinct key from (select key from src union all select key from test)aa;

3.intersect 不支持

4.minus 不支持

5.except 不支持

6.inner join/join/left outer join/right outer join/full outer join/left semi join 都支持
left outer join/right outer join/full outer join 中间必须有outer
join是最简单的关联操作,两边关联只取交集;
left outer join是以左表驱动,右表不存在的key均赋值为null;
right outer join是以右表驱动,左表不存在的key均赋值为null;
full outer join全表关联,将两表完整的进行笛卡尔积操作,左右表均可赋值为null;
left semi join最主要的使用场景就是解决exist in;
Hive不支持where子句中的子查询,SQL常用的exist in子句在Hive中是不支持的
不支持子查询 eg. select * from src aa where aa.key in(select bb.key from test bb);
可用以下两种方式替换:
select * from src aa left outer join test bb on aa.key=bb.key where bb.key <> null;
select * from src aa left semi join test bb on aa.key=bb.key;
大多数情况下 JOIN ON 和 left semi on 是对等的
A,B两表连接,如果B表存在重复数据
当使用JOIN ON的时候,A,B表会关联出两条记录,应为ON上的条件符合; 
而是用LEFT SEMI JOIN 当A表中的记录,在B表上产生符合条件之后就返回,不会再继续查找B表记录了,
所以如果B表有重复,也不会产生重复的多条记录。 
left outer join 支持子查询 eg. select aa.* from src aa left outer join (select * from test111)bb on aa.key=bb.a;

7. hive四中数据导入方式
1)从本地文件系统中导入数据到Hive表
create table wyp(id int,name string) ROW FORMAT delimited fields terminated by '\t' STORED AS TEXTFILE;
load data local inpath 'wyp.txt' into table wyp;
2)从HDFS上导入数据到Hive表
[wyp@master /home/q/hadoop-2.2.0]$ bin/hadoop fs -cat /home/wyp/add.txt
hive> load data inpath '/home/wyp/add.txt' into table wyp;
3)从别的表中查询出相应的数据并导入到Hive表中
hive> create table test(
> id int, name string
> ,tel string)
> partitioned by
> (age int)
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY '\t'
> STORED AS TEXTFILE;

注:test表里面用age作为了分区字段,分区:在Hive中,表的每一个分区对应表下的相应目录,所有分区的数据都是存储在对应的目录中。
比如wyp表有dt和city两个分区,则对应dt=20131218city=BJ对应表的目录为/user/hive/warehouse/dt=20131218/city=BJ,
所有属于这个分区的数据都存放在这个目录中。

hive> insert into table test
> partition (age='25')
> select id, name, tel
> from wyp;

也可以在select语句里面通过使用分区值来动态指明分区:
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> insert into table test
> partition (age)
> select id, name,
> tel, age
> from wyp;

Hive也支持insert overwrite方式来插入数据
hive> insert overwrite table test
> PARTITION (age)
> select id, name, tel, age
> from wyp;

Hive还支持多表插入
hive> from wyp
> insert into table test
> partition(age)
> select id, name, tel, age
> insert into table test3
> select id, name
> where age>25;
4)在创建表的时候通过从别的表中查询出相应的记录并插入到所创建的表中
hive> create table test4
> as
> select id, name, tel
> from wyp;

8.查看建表语句
hive> show create table test3;

9.表重命名
hive> ALTER TABLE events RENAME TO 3koobecaf; 

10.表增加列
hive> ALTER TABLE pokes ADD COLUMNS (new_col INT); 

11.添加一列并增加列字段注释 
hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment'); 

12.删除表
hive> DROP TABLE pokes; 

13.top n
hive> select * from test order by key limit 10;
14.创建数据库
Create Database baseball;

14.alter table tablename  change oldColumn newColumn column_type 修改列的名称和类型

alter table yangsy CHANGE product_no phone_no string

 

15.导入.sql文件中的sql

 spark-sql --driver-class-path /home/hadoop/hive/lib/mysql-connector-java-5.1.30-bin.jar -f testsql.sql 


insert into table CI_CUSER_20141117154351522 select mainResult.PRODUCT_NO,dw_coclbl_m02_3848.L1_01_02_01,dw_coclbl_d01_3845.L2_01_01_04 from (select PRODUCT_NO from CI_CUSER_20141114203632267) mainResult left join DW_COCLBL_M02_201407 dw_coclbl_m02_3848 on mainResult.PRODUCT_NO = dw_coclbl_m02_3848.PRODUCT_NO left join DW_COCLBL_D01_20140515 dw_coclbl_d01_3845 on dw_coclbl_m02_3848.PRODUCT_NO = dw_coclbl_d01_3845.PRODUCT_NO

insert into CI_CUSER_20141117142123638 ( PRODUCT_NO,ATTR_COL_0000,ATTR_COL_0001) select mainResult.PRODUCT_NO,dw_coclbl_m02_3848.L1_01_02_01,dw_coclbl_m02_3848.L1_01_03_01 from (select PRODUCT_NO from CI_CUSER_20141114203632267) mainResult left join DW_COCLBL_M02_201407 dw_coclbl_m02_3848 on mainResult.PRODUCT_NO = dw_coclbl_m02_3848.PRODUCT_NO 


CREATE TABLE ci_cuser_yymmddhhmisstttttt_tmp(product_no string) row format serde 'com.bizo.hive.serde.csv.CSVSerde' ; 
LOAD DATA LOCAL INPATH '/home/ocdc/coc/yuli/test123.csv' OVERWRITE INTO TABLE test_yuli2;

创建支持CSV格式的testfile文件
CREATE TABLE test_yuli7 row format serde 'com.bizo.hive.serde.csv.CSVSerde' as select * from CI_CUSER_20150310162729786;

不依赖CSVSerde的jar包创建逗号分隔的表
"create table " +listName+ " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" +
" as select * from " + listName1;

create table aaaa ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE as select * from

ThriftServer 开启FAIR模式
SparkSQL Thrift Server 开启FAIR调度方式:
1. 修改$SPARK_HOME/conf/spark-defaults.conf,新增
2. spark.scheduler.mode FAIR
3. spark.scheduler.allocation.file /Users/tianyi/github/community/apache-spark/conf/fair-scheduler.xml
4. 修改$SPARK_HOME/conf/fair-scheduler.xml(或新增该文件), 编辑如下格式内容
5. <?xml version="1.0"?>
6. <allocations>
7. <pool name="production">
8. <schedulingMode>FAIR</schedulingMode>
9. <!-- weight表示两个队列在minShare相同的情况下,可以使用资源的比例 -->
10. <weight>1</weight>
11. <!-- minShare表示优先保证的资源数 -->
12. <minShare>2</minShare>
13. </pool>
14. <pool name="test">
15. <schedulingMode>FIFO</schedulingMode>
16. <weight>2</weight>
17. <minShare>3</minShare>
18. </pool>
19. </allocations>
20. 重启Thrift Server
21. 执行SQL前,执行 
22. set spark.sql.thriftserver.scheduler.pool=指定的队列名

等操作完了 create table yangsy555 like CI_CUSER_YYMMDDHHMISSTTTTTT 然后insert into yangsy555 select * from yangsy555

 

创建一个自增序列表,使用row_number() over()为表增加序列号 以供分页查询

create table yagnsytest2 as SELECT ROW_NUMBER() OVER() as id,* from yangsytest;

 

 

Sparksql的解析与Hiveql的解析的执行流程:

posted @ 2017-10-23 21:03 xzc 阅读(97) | 评论 (0)编辑 收藏
  2017年9月14日
如果用传统SCP远程拷贝,速度是比较慢的。现在采用lz4压缩传输。LZ4是一个非常快的无损压缩算法,压缩速度在单核300MB/S,可扩展支持多核CPU。它还具有一个非常快速的解码器,速度单核可达到和超越1GB/S。通常能够达到多核系统上的RAM速度限制。 你PV 全命为Pipe Viewer,利用它我们可以查看到命令执行的进度。 下面介绍下lz4和pv的安装,下载软件: 下载pv-1.1.4.tar.gz wget http://sourceforge.jp/projects/sfnet_pipeviewer/downloads/pipeviewer/1.1.4/pv-1.1.4.tar.bz2/ 下lz4的包难一些,可能要FQ:https://dl.dropboxusercontent.com/u/59565338/LZ4/lz4-r108.tar.gz 安装灰常简单: pv安装: [root ~]$ tar jxvf pv-1.1.4.tar.bz2 [root ~]$ cd pv-1.1.4 [root pv-1.1.4]$ ./configure && make && make install lz4安装: [root ~]$ tar zxvf lz4-r108.tar.gz [root ~]$ cd lz4-r108 [root lz4-r108]$ make && make install 用法:(-c 后指定要传输的文件,ssh -p 是指定端口,后面的ip是目标主机的ip, -xC指定传到目标主机下的那个目录下,别的不用修改): tar -c mysql-slave-3307 |pv|lz4 -B4|ssh -p10022 -c arcfour128 -o"MACs umac-64@openssh.com" 192.168.100.234 "lz4 -d |tar -xC /data" 下面是我线上传一个从库的效果: 看到了吧,25.7G 只需要接近3分钟,这样远比scp速度快上了好几倍,直接scp拷贝离散文件,很消耗IO,而使用LZ4快速压缩,对性能影响不大,传输速度快 PS:下次补充同机房不同网段的传输效果及跨机房的传输效果^0^ 作者:陆炫志 出处:xuanzhi的博客 http://www.cnblogs.com/xuanzhi201111 您的支持是对博主最大的鼓励,感谢您的认真阅读。本文版权归作者所有,欢迎转载,但请保留该声明。
posted @ 2017-09-14 18:24 xzc 阅读(64) | 评论 (0)编辑 收藏
王 腾腾 和 邵 兵 2015 年 11 月 26 日发布 WeiboGoogle+用电子邮件发送本页面 Comments 1 引子 随着云时代的来临,大数据(Big data)也获得了越来越多的关注。著云台的分析师团队认为,大数据(Big data)通常用来形容一个公司创造的大量非结构化和半结构化数据,这些数据在下载到关系型数据库用于分析时会花费过多时间和金钱。大数据分析常和云计算联系到一起,因为实时的大型数据集分析需要像 MapReduce 一样的框架来向数十、数百或甚至数千的电脑分配工作。 “大数据”在互联网行业指的是这样一种现象:互联网公司在日常运营中生成、累积的用户网络行为数据。这些数据的规模是如此庞大,以至于不能用 G 或 T 来衡量。所以如何高效的处理分析大数据的问题摆在了面前。对于大数据的处理优化方式有很多种,本文中主要介绍在使用 Hadoop 平台中对数据进行压缩处理来提高数据处理效率。 压缩简介 Hadoop 作为一个较通用的海量数据处理平台,每次运算都会需要处理大量数据,我们会在 Hadoop 系统中对数据进行压缩处理来优化磁盘使用率,提高数据在磁盘和网络中的传输速度,从而提高系统处理数据的效率。在使用压缩方式方面,主要考虑压缩速度和压缩文件的可分割性。综合所述,使用压缩的优点如下: 1. 节省数据占用的磁盘空间; 2. 加快数据在磁盘和网络中的传输速度,从而提高系统的处理速度。 压缩格式 Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。 Hadoop 对每个压缩格式的支持, 详细见下表: 表 1. 压缩格式 压缩格式 工具 算法 扩展名 多文件 可分割性 DEFLATE 无 DEFLATE .deflate 不 不 GZIP gzip DEFLATE .gzp 不 不 ZIP zip DEFLATE .zip 是 是,在文件范围内 BZIP2 bzip2 BZIP2 .bz2 不 是 LZO lzop LZO .lzo 不 是 如果压缩的文件没有扩展名,则需要在执行 MapReduce 任务的时候指定输入格式。 1 2 3 4 5 hadoop jar /usr/home/hadoop/hadoop-0.20.2/contrib/streaming/ hadoop-streaming-0.20.2-CD H3B4.jar -file /usr/home/hadoop/hello/mapper.py -mapper / usr/home/hadoop/hello/mapper.py -file /usr/home/hadoop/hello/ reducer.py -reducer /usr/home/hadoop/hello/reducer.py -input lzotest -output result4 - jobconf mapred.reduce.tasks=1*-inputformatorg.apache.hadoop.mapred.LzoTextInputFormat* 性能对比 Hadoop 下各种压缩算法的压缩比,压缩时间,解压时间见下表: 表 2. 性能对比 压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度 gzip 8.3GB 1.8GB 17.5MB/s 58MB/s bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s 因此我们可以得出: 1) Bzip2 压缩效果明显是最好的,但是 bzip2 压缩速度慢,可分割。 2) Gzip 压缩效果不如 Bzip2,但是压缩解压速度快,不支持分割。 3) LZO 压缩效果不如 Bzip2 和 Gzip,但是压缩解压速度最快!并且支持分割! 这里提一下,文件的可分割性在 Hadoop 中是很非常重要的,它会影响到在执行作业时 Map 启动的个数,从而会影响到作业的执行效率! 所有的压缩算法都显示出一种时间空间的权衡,更快的压缩和解压速度通常会耗费更多的空间。在选择使用哪种压缩格式时,我们应该根据自身的业务需求来选择。 下图是在本地压缩与通过流将压缩结果上传到 BI 的时间对比。 图 1. 时间对比 图 1. 时间对比 使用方式 MapReduce 可以在三个阶段中使用压缩。 1. 输入压缩文件。如果输入的文件是压缩过的,那么在被 MapReduce 读取时,它们会被自动解压。 2.MapReduce 作业中,对 Map 输出的中间结果集压缩。实现方式如下: 1)可以在 core-site.xml 文件中配置,代码如下 图 2. core-site.xml 代码示例 图 2. core-site.xml 代码示例 2)使用 Java 代码指定 1 2 conf.setCompressMapOut(true); conf.setMapOutputCompressorClass(GzipCode.class); 最后一行代码指定 Map 输出结果的编码器。 3.MapReduce 作业中,对 Reduce 输出的最终结果集压。实现方式如下: 1)可以在 core-site.xml 文件中配置,代码如下 图 3. core-site.xml 代码示例 图 3. core-site.xml 代码示例 2)使用 Java 代码指定 1 2 conf.setBoolean(“mapred.output.compress”,true); conf.setClass(“mapred.output.compression.codec”,GzipCode.class,CompressionCodec.class); 最后一行同样指定 Reduce 输出结果的编码器。 压缩框架 我们前面已经提到过关于压缩的使用方式,其中第一种就是将压缩文件直接作为入口参数交给 MapReduce 处理,MapReduce 会自动根据压缩文件的扩展名来自动选择合适解压器处理数据。那么到底是怎么实现的呢?如下图所示: 图 4. 压缩实现情形 图 4. 压缩实现情形 我们在配置 Job 作业的时候,会设置数据输入的格式化方式,使用 conf.setInputFormat() 方法,这里的入口参数是 TextInputFormat.class。 TextInputFormat.class 继承于 InputFormat.class,主要用于对数据进行两方面的预处理。一是对输入数据进行切分,生成一组 split,一个 split 会分发给一个 mapper 进行处理;二是针对每个 split,再创建一个 RecordReader 读取 split 内的数据,并按照的形式组织成一条 record 传给 map 函数进行处理。此类在对数据进行切分之前,会首先初始化压缩解压工程类 CompressionCodeFactory.class,通过工厂获取实例化的编码解码器 CompressionCodec 后对数据处理操作。 下面我们来详细的看一下从压缩工厂获取编码解码器的过程。 压缩解压工厂类 CompressionCodecFactory 压缩解压工厂类 CompressionCodeFactory.class 主要功能就是负责根据不同的文件扩展名来自动获取相对应的压缩解压器 CompressionCodec.class,是整个压缩框架的核心控制器。我们来看下 CompressionCodeFactory.class 中的几个重要方法: 1. 初始化方法 图 5. 代码示例 图 5. 代码示例 ① getCodeClasses(conf) 负责获取关于编码解码器 CompressionCodec.class 的配置信息。下面将会详细讲解。 ② 默认添加两种编码解码器。当 getCodeClass(conf) 方法没有读取到相关的编码解码器 CompressionCodec.class 的配置信息时,系统会默认添加两种编码解码器 CompressionCodec.class,分别是 GzipCode.class 和 DefaultCode.class。 ③ addCode(code) 此方法用于将编码解码器 CompressionCodec.class 添加到系统缓存中。下面将会详细讲解。 2. getCodeClasses(conf) 图 6. 代码示例 图 6. 代码示例 ① 这里我们可以看,系统读取关于编码解码器 CompressionCodec.class 的配置信息在 core-site.xml 中 io.compression.codes 下。我们看下这段配置文件,如下图所示: 图 7. 代码示例 图 7. 代码示例 Value 标签中是每个编码解码 CompressionCodec.class 的完整路径,中间用逗号分隔。我们只需要将自己需要使用到的编码解码配置到此属性中,系统就会自动加载到缓存中。 除了上述的这种方式以外,Hadoop 为我们提供了另一种加载方式:代码加载。同样最终将信息配置在 io.compression.codes 属性中,代码如下: 1 2 conf.set("io.compression.codecs","org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec,com.hadoop.compression.lzo.LzopCodec");) 3. addCode(code) 方法添加编码解码器 图 8. 代码示例 图 8. 代码示例 addCodec(codec) 方法入口参数是个编码解码器 CompressionCodec.class,这里我们会首先接触到它的一个方法。 ① codec.getDefaultExtension() 方法看方法名的字面意思我们就可以知道,此方法用于获取此编码解码所对应文件的扩展名,比如,文件名是 xxxx.gz2,那么这个方法的返回值就是“.bz2”,我们来看下 org.apache.hadoop.io.compress.BZip2Codec 此方法的实现代码: 图 9. 代码示例 图 9. 代码示例 ② Codecs 是一个 SortedMap 的示例。这里有个很有意思的地方,它将 Key 值,也就是通过 codec.getDefaultExtension() 方法获取到的文件扩展名进行了翻转,举个例子,比如文件名扩展名“.bz2”,将文件名翻转之后就变成了“2zb.”。 系统加载完所有的编码解码器后,我们可以得到这样一个有序映射表,如下: 图 10. 代码示例 图 10. 代码示例 现在编码解码器都有了,我们怎么得到对应的编码解码器呢?看下面这个方法。 4. getCodec() 方法 此方法用于获取文件所对应的的编码解码器 CompressionCodec.class。 图 11. 代码示例 图 11. 代码示例 getCodec(Path) 方法的输入参数是 Path 对象,保存着文件路径。 ① 将文件名翻转。如 xxxx.bz2 翻转成 2zb.xxxx。 ② 获取 codecs 集合中最接近 2zb.xxxx 的值。此方法有返回值同样是个 SortMap 对象。 在这里对返回的 SortMap 对象进行第二次筛选。 编码解码器 CompressionCodec 刚刚在介绍压缩解压工程类 CompressionCodeFactory.class 的时候,我们多次提到了压缩解压器 CompressionCodecclass,并且我们在上文中还提到了它其中的一个用于获取文件扩展名的方法 getDefaultExtension()。 压缩解压工程类 CompressionCodeFactory.class 使用的是抽象工厂的设计模式。它是一个接口,制定了一系列方法,用于创建特定压缩解压算法。下面我们来看下比较重要的几个方法: 1. createOutputStream() 方法对数据流进行压缩。 图 12. 代码示例 图 12. 代码示例 此方法提供了方法重载。 ① 基于流的压缩处理; ② 基于压缩机 Compress.class 的压缩处理 2. createInputStream() 方法对数据流进行解压。 图 13. 代码示例 图 13. 代码示例 这里的解压方法同样提供了方法重载。 ① 基于流的解压处理; ② 基于解压机 Decompressor.class 的解压处理; 关于压缩/解压流与压缩/解压机会在下面的文章中我们会详细讲解。此处暂作了解。 3. getCompressorType() 返回需要的编码器的类型。 getDefaultExtension() 获取对应文件扩展名的方法。前文已提到过,不再敖述。 压缩机 Compressor 和解压机 Decompressor 前面在编码解码器部分的 createInputStream() 和 createInputStream() 方法中我们提到过 Compressor.class 和 Decompressor.class 对象。在 Hadoop 的实现中,数据编码器和解码器被抽象成了两个接口: 1. org.apache.hadoop.io.compress.Compressor; 2. org.apache.hadoop.io.compress.Decompressor; 它们规定了一系列的方法,所以在 Hadoop 内部的编码/解码算法实现都需要实现对应的接口。在实际的数据压缩与解压缩过程,Hadoop 为用户提供了统一的 I/O 流处理模式。 我们看一下压缩机 Compressor.class,代码如下: 图 14. 代码示例 图 14. 代码示例 ① setInput() 方法接收数据到内部缓冲区,可以多次调用; ② needsInput() 方法用于检查缓冲区是否已满。如果是 false 则说明当前的缓冲区已满; ③ getBytesRead() 输入未压缩字节的总数; ④ getBytesWritten() 输出压缩字节的总数; ⑤ finish() 方法结束数据输入的过程; ⑥ finished() 方法用于检查是否已经读取完所有的等待压缩的数据。如果返回 false,表明压缩器中还有未读取的压缩数据,可以继续通过 compress() 方法读取; ⑦ compress() 方法获取压缩后的数据,释放缓冲区空间; ⑧ reset() 方法用于重置压缩器,以处理新的输入数据集合; ⑨ end() 方法用于关闭解压缩器并放弃所有未处理的输入; ⑩ reinit() 方法更进一步允许使用 Hadoop 的配置系统,重置并重新配置压缩器; 为了提高压缩效率,并不是每次用户调用 setInput() 方法,压缩机就会立即工作,所以,为了通知压缩机所有数据已经写入,必须使用 finish() 方法。finish() 调用结束后,压缩机缓冲区中保持的已经压缩的数据,可以继续通过 compress() 方法获得。至于要判断压缩机中是否还有未读取的压缩数据,则需要利用 finished() 方法来判断。 压缩流 CompressionOutputStream 和解压缩流 CompressionInputStream 前文编码解码器部分提到过 createInputStream() 方法返回 CompressionOutputStream 对象,createInputStream() 方法返回 CompressionInputStream 对象。这两个类分别继承自 java.io.OutputStream 和 java.io.InputStream。从而我们不难理解,这两个对象的作用了吧。 我们来看下 CompressionInputStream.class 的代码: 图 15. 代码示例 图 15. 代码示例 可以看到 CompressionOutputStream 实现了 OutputStream 的 close() 方法和 flush() 方法,但用于输出数据的 write() 方法以及用于结束压缩过程并将输入写到底层流的 finish() 方法和重置压缩状态的 resetState() 方法还是抽象方法,需要 CompressionOutputStream 的子类实现。 Hadoop 压缩框架中为我们提供了一个实现了 CompressionOutputStream 类通用的子类 CompressorStream.class。 图 16. 代码示例 图 16. 代码示例 CompressorStream.class 提供了三个不同的构造函数,CompressorStream 需要的底层输出流 out 和压缩时使用的压缩器,都作为参数传入构造函数。另一个参数是 CompressorStream 工作时使用的缓冲区 buffer 的大小,构造时会利用这个参数分配该缓冲区。第一个可以手动设置缓冲区大小,第二个默认 512,第三个没有缓冲区且不可使用压缩器。 图 17. 代码示例 图 17. 代码示例 在 write()、compress()、finish() 以及 resetState() 方法中,我们发现了压缩机 Compressor 的身影,前面文章我们已经介绍过压缩机的的实现过程,通过调用 setInput() 方法将待压缩数据填充到内部缓冲区,然后调用 needsInput() 方法检查缓冲区是否已满,如果缓冲区已满,将调用 compress() 方法对数据进行压缩。流程如下图所示: 图 18. 调用流程图 图 18. 调用流程图 结束语 本文深入到 Hadoop 平台压缩框架内部,对其核心代码以及各压缩格式的效率进行对比分析,以帮助读者在使用 Hadoop 平台时,可以通过对数据进行压缩处理来提高数据处理效率。当再次面临海量数据处理时, Hadoop 平台的压缩机制可以让我们事半功倍。 相关主题 Hadoop 在线 API 《Hadoop 技术内幕深入解析 HADOOP COMMON 和 HDFS 架构设计与实现原理》 developerWorks 开源技术主题:查找丰富的操作信息、工具和项目更新,帮助您掌握开源技术并将其用于 IBM 产品。
posted @ 2017-09-14 17:35 xzc 阅读(237) | 评论 (0)编辑 收藏
  2017年9月10日

Linux系统查看当前主机CPU、内存、机器型号及主板信息:


查看CPU信息(型号)
# cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c

 

查看内存信息
# cat /proc/meminfo

 

查看主板型号:
# dmidecode |grep -A16 "System Information$"

 

查看机器型号
# dmidecode | grep "Product Name"

 

查看当前操作系统内核信息
# uname -a

 

查看当前操作系统发行版信息
# cat /etc/issue | grep Linux

posted @ 2017-09-10 16:37 xzc 阅读(43) | 评论 (0)编辑 收藏
  2017年9月7日
本文介绍Hadoop YARN最近版本中增加的几个非常有用的特性,包括: (1)ResourceManager HA 在apache hadoop 2.4或者CDH5.0.0版本之后,增加了ResourceManger HA特性,支持基于Zookeeper的热主备切换,具体配置参数可以参考Cloudera的文档:ResourceManager HA配置。 需要注意的是,ResourceManager HA只完成了第一个阶段的设计,即备ResourceManager启动后,会杀死之前正在运行的Application,然后从共享存储系统中读取这些Application的元数据信息,并重新提交这些Application。启动ApplicationMaster后,剩下的容错功能就交给ApplicationMaster实现了,比如MapReduce的ApplicationMaster会不断地将完成的任务信息写到HDFS上,这样,当它重启时,可以重新读取这些日志,进而只需重新运行那些未完成的任务。ResourceManager HA第二个阶段的任务是,备ResourceManager接管主ResourceManager后,无需杀死那些正在运行的Application,让他们像任何事情没有发生一样运行下去。 (2) 磁盘容错 在apache hadoop 2.4或者CDH5.0.0版本之后,增加了几个对多磁盘非常友好地参数,这些参数允许YARN更好地使用NodeManager上的多块磁盘,相关jira为:YARN-1781,主要新增了三个参数: yarn.nodemanager.disk-health-checker.min-healthy-disks:NodeManager上最少保证健康磁盘比例,当健康磁盘比例低于该值时,NodeManager不会再接收和启动新的Container,默认值是0.25,表示25%; yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage:一块磁盘的最高使用率,当一块磁盘的使用率超过该值时,则认为该盘为坏盘,不再使用该盘,默认是100,表示100%,可以适当调低; yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb:一块磁盘最少保证剩余空间大小,当某块磁盘剩余空间低于该值时,将不再使用该盘,默认是0,表示0MB。 (3)资源调度器 Fair Scheduler:Fair Scheduler增加了一个非常有用的新特性,允许用户在线将一个应用程序从一个队列转移到另外一个队列,比如将一个重要作业从一个低优先级队列转移到高优先级队列,操作命令是:bin/yarn application -movetoqueue appID -queue targetQueueName,相关jira为:YARN-1721。 Capacity Scheduler:Capacity Scheduler中资源抢占功能经过了充分的测试,可以使用了。 原创文章,转载请注明: 转载自董的博客 本文链接地址: http://dongxicheng.org/mapreduce-nextgen/hadoop-yarn-recently-new-features/
posted @ 2017-09-07 11:37 xzc 阅读(40) | 评论 (0)编辑 收藏
  2017年8月30日
关于mapreduce程序运行在yarn上时内存的分配一直是一个让我蒙圈的事情,单独查任何一个资料都不能很好的理解透彻。于是,最近查了大量的资料,综合各种解释,终于理解到了一个比较清晰的程度,在这里将理解的东西做一个简单的记录,以备忘却。 首先,先将关于mapreduce和yarn关于内存分配的参数粘贴上: yarn.scheduler.minimum-allocation-mb yarn.scheduler.maximum-allocation-mb yarn.nodemanager.resource.memory-mb yarn.nodemanager.vmem-pmem-ratio yarn.scheduler.increment-allocation-mb mapreduce.map.memory.mb mapreduce.reduce.memory.mb mapreduce.map.java.opts mapreduce.reduce.java.opts 个人认为,针对mapreduce任务,这些参数只有放在一起学习才能真正理解,如果单独考虑,理解不清晰。下面开始详细讲解。 一、理解参数yarn.nodemanager.resource.memory-mb,yarn.nodemanager.vmem-pmem-ratio yarn.nodemanager.resource.memory-mb很简单,就是你的这台服务器节点上准备分给yarn的内存; yarn.nodemanager.vmem-pmem-ratio网上解释都是"每使用1MB物理内存,最多可用的虚拟内存数,默认2.1",但是目前我还是不太理解其作用是什么,有知道的朋友希望能详细解释下。 二、理解参数yarn.scheduler.minimum-allocation-mb和yarn.scheduler.maximum-allocation-mb 都知道,在yarn上运行程序时每个task都是在独立的Container中运行的,单个Container可以申请的最小和最大内存的限制就是这两个参数,注意,并不是这两个参数决定单个Container申请内存的大小,而仅仅是限制的一个范围。 三、理解yarn的内存规整化因子和内存规整化算法 先不说和哪个参数有关,单纯理解这一概念。举例: 假如规整化因子b=512M,上述讲的参数yarn.scheduler.minimum-allocation-mb为1024,yarn.scheduler.maximum-allocation-mb为8096,然后我打算给单个map任务申请内存资源(mapreduce.map.memory.mb): 申请的资源为a=1000M时,实际得到的Container内存大小为1024M(小于yarn.scheduler.minimum-allocation-mb的话自动设置为yarn.scheduler.minimum-allocation-mb); 申请的资源为a=1500M时,实际得到的Container内存大小为1536M,计算公式为:ceiling(a/b)*b,即ceiling(a/b)=ceiling(1500/512)=3,3*512=1536。此处假如b=1024,则Container实际内存大小为2048M 也就是说Container实际内存大小最小为yarn.scheduler.minimum-allocation-mb值,然后增加时的最小增加量为规整化因子b,最大不超过yarn.scheduler.maximum-allocation-mb 四、理解mapreduce.map.memory.mb、mapreduce.reduce.memory.mb "三"中提到的"打算给单个map任务申请内存资源"也就是a,其实就是指的"mapreduce.map.memory.mb"或"mapreduce.reduce.memory.mb",注意其值不要超过yarn.scheduler.maximum-allocation-mb 五、理解mapreduce.map.java.opts、mapreduce.reduce.java.opts 以map任务为例,Container其实就是在执行一个脚本文件,而脚本文件中,会执行一个 Java 的子进程,这个子进程就是真正的 Map Task,mapreduce.map.java.opts 其实就是启动 JVM 虚拟机时,传递给虚拟机的启动参数,而默认值 -Xmx200m 表示这个 Java 程序可以使用的最大堆内存数,一旦超过这个大小,JVM 就会抛出 Out of Memory 异常,并终止进程。而 mapreduce.map.memory.mb 设置的是 Container 的内存上限,这个参数由 NodeManager 读取并进行控制,当 Container 的内存大小超过了这个参数值,NodeManager 会负责 kill 掉 Container。在后面分析 yarn.nodemanager.vmem-pmem-ratio 这个参数的时候,会讲解 NodeManager 监控 Container 内存(包括虚拟内存和物理内存)及 kill 掉 Container 的过程。 也就是说,mapreduce.map.java.opts一定要小于mapreduce.map.memory.mb mapreduce.reduce.java.opts同mapreduce.map.java.opts一样的道理。 六、理解规整化因子指的是哪个参数 "三"中提到的规整化因子也就是b,具体指的是哪个参数和yarn使用的调度器有关,一共有三种调度器:capacity scheduler(默认调度器)、fair scheduler和fifo scheduler 当使用capacity scheduler或者fifo scheduler时,规整化因子指的就是参数yarn.scheduler.minimum-allocation-mb,不能单独配置,即yarn.scheduler.increment-allocation-mb无作用; 当使用fair scheduler时,规整化因子指的是参数yarn.scheduler.increment-allocation-mb 至此,关于yarn和mapreduce的任务内存配置问题讲完了,这也是我目前理解的层次。
posted @ 2017-08-30 21:05 xzc 阅读(37) | 评论 (0)编辑 收藏
  2017年8月14日
1. 日期输出格式化

所有日期、时间的api都在datetime模块内。

1. datetime => string

now = datetime.datetime.now()
now.strftime('%Y-%m-%d %H:%M:%S')
#输出2012-03-05 16:26:23.870105

strftime是datetime类的实例方法。

2. string => datetime

t_str = '2012-03-05 16:26:23'
d = datetime.datetime.strptime(t_str, '%Y-%m-%d %H:%M:%S')

strptime是datetime类的静态方法。

2. 日期比较操作

在datetime模块中有timedelta类,这个类的对象用于表示一个时间间隔,比如两个日期或者时间的差别。

构造方法:

datetime.timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

所有的参数都有默认值0,这些参数可以是int或float,正的或负的。

可以通过timedelta.days、tiemdelta.seconds等获取相应的时间值。

timedelta类的实例,支持加、减、乘、除等操作,所得的结果也是timedelta类的实例。比如:

year = timedelta(days=365)
ten_years = year *10
nine_years = ten_years - year

同时,date、time和datetime类也支持与timedelta的加、减运算。

datetime1 = datetime2 +/- timedelta
timedelta = datetime1 - datetime2

这样,可以很方便的实现一些功能。

1. 两个日期相差多少天。

d1 = datetime.datetime.strptime('2012-03-05 17:41:20', '%Y-%m-%d %H:%M:%S')
d2 = datetime.datetime.strptime('2012-03-02 17:41:20', '%Y-%m-%d %H:%M:%S')
delta = d1 - d2
print delta.days

输出:3

2. 今天的n天后的日期。

now = datetime.datetime.now()
delta = datetime.timedelta(days=3)
n_days = now + delta
print n_days.strftime('%Y-%m-%d %H:%M:%S')

输出:2012-03-08 17:44:50

复制代码
#coding=utf-8
import datetime
now=datetime.datetime.now()
print now
#将日期转化为字符串 datetime => string
print now.strftime('%Y-%m-%d %H:%M:%S')

t_str = '2012-03-05 16:26:23'
#将字符串转换为日期 string => datetime
d=datetime.datetime.strptime(t_str,'%Y-%m-%d %H:%M:%S')
print d

#在datetime模块中有timedelta类,这个类的对象用于表示一个时间间隔,比如两个日#期或者时间的差别。

#计算两个日期的间隔
d1 = datetime.datetime.strptime('2012-03-05 17:41:20', '%Y-%m-%d %H:%M:%S')
d2 = datetime.datetime.strptime('2012-03-02 17:41:20', '%Y-%m-%d %H:%M:%S')
delta = d1 - d2
print delta.days
print delta

#今天的n天后的日期。
now=datetime.datetime.now()
delta=datetime.timedelta(days=3)
n_days=now+delta
print n_days.strftime('%Y-%m-%d %H:%M:%S')
复制代码
posted @ 2017-08-14 23:09 xzc 阅读(75) | 评论 (0)编辑 收藏
  2017年8月2日

Shell中并没有真正意义的多线程,要实现多线程可以启动多个后端进程,最大程度利用cpu性能。

直接看代码示例吧。

(1) 顺序执行的代码

复制代码
 1 #!/bin/bash  2 date  3 for i in `seq 1 5`  4 do  5 {  6     echo "sleep 5"  7     sleep 5  8 }  9 done 10 date 
复制代码

输出:

复制代码
Sat Nov 19 09:21:51 CST 2016 sleep 5 sleep 5 sleep 5 sleep 5 sleep 5 Sat Nov 19 09:22:16 CST 2016
复制代码

(2) 并行代码

使用'&'+wait 实现“多进程”实现

复制代码
 1 #!/bin/bash  2 date  3 for i in `seq 1 5`  4 do  5 {  6     echo "sleep 5"  7     sleep 5  8 } &  9 done 10 wait  ##等待所有子后台进程结束 11 date
复制代码

输出:

复制代码
Sat Nov 19 09:25:07 CST 2016 sleep 5 sleep 5 sleep 5 sleep 5 sleep 5 Sat Nov 19 09:25:12 CST 2016
复制代码

 (3) 对于大量处理任务如何实现启动后台进程的数量可控?

  简单的方法可以使用2层for/while循环实现,每次wait内层循环的多个后台程序执行完成

  但是这种方式的问题是,如果内层循环有“慢节点”可能导致整个任务的执行执行时间长。

  更高级的实现可以看(4)

(4) 使用命名管道(fifo)实现每次启动后台进程数量可控。 

复制代码
 1 #!/bin/bash  2   3 function my_cmd(){  4     t=$RANDOM  5     t=$[t%15]  6     sleep $t  7     echo "sleep $t s"  8 }  9  10 tmp_fifofile="/tmp/$$.fifo"  11 mkfifo $tmp_fifofile      # 新建一个fifo类型的文件 12 exec 6<>$tmp_fifofile     # 将fd6指向fifo类型 13 rm $tmp_fifofile    #删也可以 14  15 thread_num=5  # 最大可同时执行线程数量 16 job_num=100   # 任务总数 17  18 #根据线程总数量设置令牌个数 19 for ((i=0;i<${thread_num};i++));do 20     echo 21 done >&6  22  23 for ((i=0;i<${job_num};i++));do # 任务数量 24     # 一个read -u6命令执行一次,就从fd6中减去一个回车符,然后向下执行, 25     # fd6中没有回车符的时候,就停在这了,从而实现了线程数量控制 26     read -u6  27  28     #可以把具体的需要执行的命令封装成一个函数 29     {    30         my_cmd 31     } & 32  33     echo >&6 # 当进程结束以后,再向fd6中加上一个回车符,即补上了read -u6减去的那个 34 done 35  36 wait 37 exec 6>&- # 关闭fd6 38 echo "over"
复制代码

 

参考:http://lawrence-zxc.github.io/2012/06/16/shell-thread/

posted @ 2017-08-02 17:01 xzc 阅读(43) | 评论 (0)编辑 收藏
仅列出标题  下一页