#
kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。
kakfa的consumer使用拉的方式工作。
安装kafka下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz
> tar xzf kafka-.tgz
> cd kafka-
> ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。
使用kafka
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.SyncProducer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
Properties props =
new Properties();
props.put(“zk.connect”, “127.0.0.1:2181”);
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config =
new ProducerConfig(props);
Producer<String, String> producer =
new Producer<String, String>(config);
Send a single message
// The message is sent to a randomly selected partition registered in ZK
ProducerData<String, String> data =
new ProducerData<String, String>("test-topic", "test-message");
producer.send(data);
producer.close();
这样就是一个标准的producer。
consumer的代码
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for(final KafkaMessageStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(Message message: stream) {
// process message
}
}
});
}
日志抓取端:
apache kafka在数据处理中特别是日志和消息的处理上会有很多出色的表现,这里写个索引,关于kafka的文章暂时就更新到这里,最近利用空闲时间在对kafka做一些功能性增强,并java化,虽然现在已经有很多这样的版本,但是根据实际需求来改变才是最适合的。
首先当然推荐的是kafka的官网 http://kafka.apache.org/
在官网最值得参考的文章就是kafka design:http://kafka.apache.org/design.html,我的文章也基本都是参照这里的说明,大家要特别重视这篇文章,里面有好多理念都特别好,推荐多读几遍。
在OSC的翻译频道有kafka design全中文的翻译,翻得挺好的,推荐一下:http://www.oschina.net/translate/kafka-design
kafka的wiki是很不错的学习文档:https://cwiki.apache.org/confluence/display/KAFKA/Index
——————————————————————————————————
接下来就是我写的一系列文章,文章都是循序渐进的方式带你了解kafka:
关于kafka的基本知识,分布式的基础:《分布式消息系统Kafka初步》
kafka的分布式搭建,quick start:《kafka分布式环境搭建》
关于kafka的实现细节,这主要就是讲design的部分:《细节上》、《细节下》
关于kafka开发环境,scala环境的搭建:《开发环境搭建》
数据生产者,producer的用法:《producer的用法》、《producer使用注意》
数据消费者,consumer的用法:《consumer的用法》
还有些零碎的,关于通信段的源码解读:《net包源码解读》、《broker配置》
——————————————————————————————————
扩展的阅读还有下面这些:
我的好友写的关于kafka和jafka的相关博客,特别好,我有很多问题也都找他解决的,大神一般的存在:http://rockybean.github.com/ @rockybean
kafka的java化版本jafka:https://github.com/adyliu/jafka
淘宝的metaQ:https://github.com/killme2008/Metamorphosis
我最近在写的inforQ,刚开始写,我也纯粹是为了读下源码,不定期更新哈:https://github.com/ielts0909/inforq
后面一阶段可能更新点儿关于cas的东西吧,具体也没想好,最近一直出差,写代码的时间都很少
--------------------------------------------------------------------------------
0.8版本的相关更新如下:
0.8更新内容介绍:《kafka0.8版本的一些更新》
- get the Whirr tar file
wget http://www.eu.apache.org/dist/whirr/stable/whirr-0.8.2.tar.gz
- untar the Whirr tar file
tar -vxf whirr-0.8.2.tar.gz
- create credentials file
mkdir ~/.whirr
cp conf/credentials.sample ~/.whirr/credentials
- add the following content to credentials file
# Set cloud provider connection details
PROVIDER=aws-ec2
IDENTITY=<AWS Access Key ID>
CREDENTIAL=<AWS Secret Access Key>
generate a rsa key pair
ssh-keygen -t rsa -P ''
- create a hadoop.properties file and add the following content
whirr.cluster-name=whirrhadoopcluster
whirr.instance-templates=1 hadoop-jobtracker+hadoop-namenode,2 hadoop-datanode+hadoop-tasktracker
whirr.provider=aws-ec2
whirr.private-key-file=${sys:user.home}/.ssh/id_rsa
whirr.public-key-file=${sys:user.home}/.ssh/id_rsa.pub
whirr.hadoop.version=1.0.2
whirr.aws-ec2-spot-price=0.08
- launch hadoop
bin/whirr launch-cluster --config hadoop.properties
- launch proxy
cd ~/.whirr/whirrhadoopcluster/
./hadoop-proxy.sh
- add a rule to iptables
0.0.0.0/0 50030
0.0.0.0/0 50070
- check the web ui in the browser
http://<aws-public-dns>:50030
- add to /etc/profile
export HADOOP_CONF_DIR=~/.whirr/whirrhadoopcluster/
- check if the hadoop works
hadoop fs -ls /
如果简单地比较Redis与Memcached的区别,大多数都会得到以下观点:
1 Redis不仅仅支持简单的k/v类型的数据,同时还提供list,set,hash等数据结构的存储。
2 Redis支持数据的备份,即master-slave模式的数据备份。
3 Redis支持数据的持久化,可以将内存中的数据保持在磁盘中,重启的时候可以再次加载进行使用。
在Redis中,并不是所有的数据都一直存储在内存中的。这是和Memcached相比一个最大的区别(我个人是这么认为的)。
Redis只会缓存所有的key的信息,如果Redis发现内存的使用量超过了某一个阀值,将触发swap的操作,Redis根据“swappability = age*log(size_in_memory)”计算出哪些key对应的value需要swap到磁盘。然后再将这些key对应的value持久化到磁盘中,同时在内存中清除。这种特性使得Redis可以保持超过其机器本身内存大小的数据。当然,机器本身的内存必须要能够保持所有的key,毕竟这些数据是不会进行swap操作的。
同时由于Redis将内存中的数据swap到磁盘中的时候,提供服务的主线程和进行swap操作的子线程会共享这部分内存,所以如果更新需要swap的数据,Redis将阻塞这个操作,直到子线程完成swap操作后才可以进行修改。
可以参考使用Redis特有内存模型前后的情况对比:
VM off: 300k keys, 4096 bytes values: 1.3G used
VM on: 300k keys, 4096 bytes values: 73M used
VM off: 1 million keys, 256 bytes values: 430.12M used
VM on: 1 million keys, 256 bytes values: 160.09M used
VM on: 1 million keys, values as large as you want, still: 160.09M used
当从Redis中读取数据的时候,如果读取的key对应的value不在内存中,那么Redis就需要从swap文件中加载相应数据,然后再返回给请求方。这里就存在一个I/O线程池的问题。在默认的情况下,Redis会出现阻塞,即完成所有的swap文件加载后才会相应。这种策略在客户端的数量较小,进行批量操作的时候比较合适。但是如果将Redis应用在一个大型的网站应用程序中,这显然是无法满足大并发的情况的。所以Redis运行我们设置I/O线程池的大小,对需要从swap文件中加载相应数据的读取请求进行并发操作,减少阻塞的时间。
redis、memcache、mongoDB 对比
从以下几个维度,对redis、memcache、mongoDB 做了对比,欢迎拍砖
1、性能
都比较高,性能对我们来说应该都不是瓶颈
总体来讲,TPS方面redis和memcache差不多,要大于mongodb
2、操作的便利性
memcache数据结构单一
redis丰富一些,数据操作方面,redis更好一些,较少的网络IO次数
mongodb支持丰富的数据表达,索引,最类似关系型数据库,支持的查询语言非常丰富
3、内存空间的大小和数据量的大小
redis在2.0版本后增加了自己的VM特性,突破物理内存的限制;可以对key value设置过期时间(类似memcache)
memcache可以修改最大可用内存,采用LRU算法
mongoDB适合大数据量的存储,依赖操作系统VM做内存管理,吃内存也比较厉害,服务不要和别的服务在一起
4、可用性(单点问题)
对于单点问题,
redis,依赖客户端来实现分布式读写;主从复制时,每次从节点重新连接主节点都要依赖整个快照,无增量复制,因性能和效率问题,
所以单点问题比较复杂;不支持自动sharding,需要依赖程序设定一致hash 机制。
一种替代方案是,不用redis本身的复制机制,采用自己做主动复制(多份存储),或者改成增量复制的方式(需要自己实现),一致性问题和性能的权衡
Memcache本身没有数据冗余机制,也没必要;对于故障预防,采用依赖成熟的hash或者环状的算法,解决单点故障引起的抖动问题。
mongoDB支持master-slave,replicaset(内部采用paxos选举算法,自动故障恢复),auto sharding机制,对客户端屏蔽了故障转移和切分机制。
5、可靠性(持久化)
对于数据持久化和数据恢复,
redis支持(快照、AOF):依赖快照进行持久化,aof增强了可靠性的同时,对性能有所影响
memcache不支持,通常用在做缓存,提升性能;
MongoDB从1.8版本开始采用binlog方式支持持久化的可靠性
6、数据一致性(事务支持)
Memcache 在并发场景下,用cas保证一致性
redis事务支持比较弱,只能保证事务中的每个操作连续执行
mongoDB不支持事务
7、数据分析
mongoDB内置了数据分析的功能(mapreduce),其他不支持
8、应用场景
redis:数据量较小的更性能操作和运算上
memcache:用于在动态系统中减少数据库负载,提升性能;做缓存,提高性能(适合读多写少,对于数据量比较大,可以采用sharding)
MongoDB:主要解决海量数据的访问效率问题
Hive是建立在Hadoop上的数据仓库基础构架。它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 定义了简单的类 SQL 查询语言,称为 HQL,它允许熟悉 SQL 的用户查询数据。同时,这个语言也允许熟悉 MapReduce 开发者的开发自定义的 mapper 和 reducer 来处理内建的 mapper 和 reducer 无法完成的复杂的分析工作。
Hive 没有专门的数据格式。 Hive 可以很好的工作在 Thrift 之上,控制分隔符,也允许用户指定数据格式
hive与关系数据库的区别:
数据存储不同:hive基于hadoop的HDFS,关系数据库则基于本地文件系统
计算模型不同:hive基于hadoop的mapreduce,关系数据库则基于索引的内存计算模型
应用场景不同:hive是OLAP数据仓库系统提供海量数据查询的,实时性很差;关系数据库是OLTP事务系统,为实时查询业务服务
扩展性不同:hive基于hadoop很容易通过分布式增加存储能力和计算能力,关系数据库水平扩展很难,要不断增加单机的性能
Hive安装及使用攻略
http://blog.fens.me/hadoop-hive-intro/
R利剑NoSQL系列文章 之 Hive
http://cos.name/2013/07/r-nosql-hive/
摘要: This document is for Anyela Chavarro.
Only these version of each framework work together
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
-->H...
阅读全文
运行MAPREDUCE JOB时,如果输入的文件比较小而多时,默认情况下会生成很多的MAP JOB,即一个文件一个MAP JOB,因此需要优化,使多个文件能合成一个MAP JOB的输入。
具体的原理是下述三步:
1.根据输入目录下的每个文件,如果其长度超过mapred.max.split.size,以block为单位分成多个split(一个split是一个map的输入),每个split的长度都大于mapred.max.split.size, 因为以block为单位, 因此也会大于blockSize, 此文件剩下的长度如果大于mapred.min.split.size.per.node, 则生成一个split, 否则先暂时保留.
2. 现在剩下的都是一些长度效短的碎片,把每个rack下碎片合并, 只要长度超过mapred.max.split.size就合并成一个split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一个split, 否则暂时保留.
3. 把不同rack下的碎片合并, 只要长度超过mapred.max.split.size就合并成一个split, 剩下的碎片无论长度, 合并成一个split.
举例: mapred.max.split.size=1000
mapred.min.split.size.per.node=300
mapred.min.split.size.per.rack=100
输入目录下五个文件,rack1下三个文件,长度为2050,1499,10, rack2下两个文件,长度为1010,80. 另外blockSize为500.
经过第一步, 生成五个split: 1000,1000,1000,499,1000. 剩下的碎片为rack1下:50,10; rack2下10:80
由于两个rack下的碎片和都不超过100, 所以经过第二步, split和碎片都没有变化.
第三步,合并四个碎片成一个split, 长度为150.
如果要减少map数量, 可以调大mapred.max.split.size, 否则调小即可.
其特点是: 一个块至多作为一个map的输入,一个文件可能有多个块,一个文件可能因为块多分给做为不同map的输入, 一个map可能处理多个块,可能处理多个文件。
注:CombineFileInputFormat是一个抽象类,需要编写一个继承类。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
@SuppressWarnings("deprecation")
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException {
return new CombineFileRecordReader(conf, (CombineFileSplit) split, reporter, (Class) myCombineFileRecordReader.class);
}
public static class myCombineFileRecordReader implements RecordReader<LongWritable, Text> {
private final LineRecordReader linerecord;
public myCombineFileRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index), split.getLength(index), split.getLocations());
linerecord = new LineRecordReader(conf, filesplit);
}
@Override
public void close() throws IOException {
linerecord.close();
}
@Override
public LongWritable createKey() {
// TODO Auto-generated method stub
return linerecord.createKey();
}
@Override
public Text createValue() {
// TODO Auto-generated method stub
return linerecord.createValue();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return linerecord.getPos();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return linerecord.getProgress();
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
// TODO Auto-generated method stub
return linerecord.next(key, value);
}
}
}
在运行时这样设置:
if (argument !=
null) {
conf.set("mapred.max.split.size", argument);
}
else {
conf.set("mapred.max.split.size", "134217728");
// 128 MB
}
//
conf.setInputFormat(CombinedInputFormat.
class);