庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理

    Kafka这个linkedin开源的MQ,我在过去的blog简单介绍过。最近3周来,我的工作就是做它的一个Java移植版本,kafka是用scala写的,基于维护和定制的角度,这个拷贝的版本还是用Java。说拷贝,也不尽然,原理相同,但实现完全换过,从数据结构到通讯框架、通讯协议、程序组织,乃至一些重要功能点上都做了改进和更新。我将这个Java版本取名为metamorphosis,也就是卡夫卡的代表作《变形记》的英文名。

    在原版本上,目前做了如下改进:
1、协议替换为文本协议,整个协议类似memcached,文本协议的优点自不必说。通讯框架也是采用内部使用的通讯框架,减少工作量。

2、存储结构上也采用自定义结构,更简洁紧凑。

3、kafka原来只支持consumer和broker之间的服务查找和负载均衡,meta加入了producer和broker之间的服务查找和负载均衡。

4、Consumer API没有采用kafka的stream方式,而是同时实现同步获取和异步订阅两种方式,更接近JMS和Notify。

5、改进了服务器端文件recover的性能,采用并发多线程recover的方式(可选)。

6、添加了实时统计功能和协议,类似memcached的stats协议,响应透明号召。

7、客户端的连接复用。
   
    以后要做的事情,可能包括:
1、实现类似Mysql的master/slave方案,可能还要分为同步和异步两种模式。

2、分区扩展时候的数据自动迁移功能,做到无痛水平扩展。

3、高可用方案的另一个实现。

4、嵌入Http server做web管理。
  
    工作在本周初步告一段落,接下来是要做集成测试和压测等,我在两台8核16G的机器上分别部署服务器和客户端(订阅者发布者同在一台),做的一个简单压测数据如下:并发100个线程发送5000万消息并同时消费,1K大小的消息TPS可以达到3.8万,4K大小的消息TPS可以达到1.8万,服务器load都维持在一个较低的水平。从这个数据来看,超过我一开始的预期。后续可能做下kakfa的测试对比下。


posted @ 2011-05-07 10:46 dennis 阅读(5514) | 评论 (6)编辑 收藏

    无论是消息系统,还是配置管理中心,甚至存储系统,你都要面临这样一个选择,push模型 or pull模型?是服务端主动给客户端推送数据,还是客户端去服务器拉数据,一张图表对比如下:
 
push模型 pull模型
描述 服务端主动发送数据给客户端 客户端主动从服务端拉取数据,通常客户端会定时拉取
实时性 较好,收到数据后可立即发送给客户端 一般,取决于pull的间隔时间
服务端状态 需要保存push状态,哪些客户端已经发送成功,哪些发送失败 服务端无状态
 客户端状态  无需额外保存状态 需保存当前拉取的信息的状态,以便在故障或者重启的时候恢复
状态保存 集中式,集中在服务端 分布式,分散在各个客户端
负载均衡 服务端统一处理和控制 客户端之间做分配,需要协调机制,如使用zookeeper
其他

服务端需要做流量控制,无法最大化客户端的处理能力。

其次,在客户端故障情况下,无效的push对服务端有一定负载。

客户端的请求可能很多无效或者没有数据可供传输,浪费带宽和服务器处理能力
缺点方案 服务器端的状态存储是个难点,可以将这些状态转移到DB或者key-value存储,来减轻server压力。

针对实时性的问题,可以将push加入进来,push小数据的通知信息,让客户端再来主动pull。

针对无效请求的问题,可以设置逐渐延长间隔时间的策略,以及合理设计协议尽量缩小请求数据包来节省带宽。



在面对大量甚至海量客户端的时候,使用push模型,保存大量的状态信息是个沉重的负担,加上复制N份数据分发的压力,也会使得实时性这唯一的优点也被放小。使用pull模型,通过将客户端状态保存在客户端,大大减轻了服务器端压力,通过客户端自身做流量控制也更容易,更能发挥客户端的处理能力,但是需要面对如何在这些客户端之间做协调的难题。

posted @ 2011-04-30 01:06 dennis 阅读(4520) | 评论 (1)编辑 收藏



    HandlerSocket是一个mysql插件,可以将mysql作为NoSQL来使用,具体可以看我过去写的这篇Bloghs4j是HandlerSocket的一个java客户端,自认为它比日本人写的那个客户端更实用和易用一些。写完好久,经过不少朋友使用和测试,现在正式发一个0.1版本,并已同步到maven中心仓库。

    项目主页:http://code.google.com/p/hs4j/
    项目描述:hs4j is a practical java client for HandlerSocket,it is nio based and turned to get better performance.
    使用文档:http://code.google.com/p/hs4j/w/list
    下载地址:http://code.google.com/p/hs4j/downloads/list
    源码仓库:https://github.com/killme2008/hs4j

     如果你使用maven2,可以直接引用:
<dependency>
  
<groupId>com.googlecode.hs4j</groupId>
  
<artifactId>hs4j</artifactId>
  
<version>0.1</version>
</dependency>

     有疑问和bug请联系我。

posted @ 2011-03-29 06:55 dennis 阅读(3601) | 评论 (3)编辑 收藏

     Xmemcached是一个开源的java memcached client,具有高性能、更易用、功能完善等优点,距离上次发布1.3.1已经超过两个月,现在正式发布1.3.2这个新版本,主要的改进如下:


1、Bug修复,从1.3.1版本以来发现的bug并修复,包括:

issue 112:: 新引入的failure模式在启动的时候,如果memcached故障,运行不符合预期的bug.

issue 113: 新增加一个delete方法,可以设置操作超时

public boolean delete(final String key, long opTimeout)
            
throws TimeoutException, InterruptedException, MemcachedException;

2、性能调优,存储操作(set/add/replace/prepend/append/cas)的性能提升5%。

3、修复pom.xml,使得xmemcached可以在其他机器上编译。

4、使用github作为源码仓库,版本管理使用git替换svn,源码转移到

      https://github.com/killme2008/xmemcached

新版本下载地址:

    http://code.google.com/p/xmemcached/downloads/list

使用maven可以直接引用: 

<dependency>
      <groupId>com.googlecode.xmemcached</groupId>
      
<artifactId>xmemcached</artifactId>
      
<version>1.3.2</version>
 
</dependency>

项目文档:

http://code.google.com/p/xmemcached/w/list

posted @ 2011-03-27 14:06 dennis 阅读(2996) | 评论 (1)编辑 收藏


    在国内,Clojure语言的用户估计是小众中的小众,没有多少人听说,也没有多少人使用,资料也大多数是英文的,讨论也只能上国外论坛。因此,我想建立一个CN-Clojure的google group,供大家交流和学习clojure语言。群组地址(需要翻墙):http://groups.google.com/group/cn-clojure

   现在没人,就我一个。我也会在群组里放些学习资料,欢迎任何对clojure感兴趣的朋友加入。

posted @ 2011-01-28 19:39 dennis 阅读(3848) | 评论 (5)编辑 收藏

     昨天晚上用clojure搞了个scheme解释器,基本上是sicp里的解释器的clojure翻译版本,可能唯一值的一提的是对transient集合的使用,实现副作用的set!。总共代码包含注释才366行,支持的feature包括

Feature Supported Comment
define yes
lambda yes
variable lookup yes
primitive procedure evaluation yes
compound procedure evaluation yes no tail recursion yet
if yes
cond yes
let yes

let* yes
no named let* yet
letrec no
begin yes

set! yes

quote yes
quasiquote no
unquote no
delay no
define-syntax no

       支持的primitive procedure包括常见的四则运算、car/cdr、list以及display、newline等。代码放在了github上:https://github.com/killme2008/cscheme,有兴趣的可以玩玩吧。

posted @ 2011-01-24 10:42 dennis 阅读(3772) | 评论 (0)编辑 收藏


    最近因为空闲时间有一些,所以去看了不少开源项目,大部分东西如果看过不记录下来,其实还是相当于没看,所以想想还是有必要摘要记录一下。

    首先是去了解了zookeeper这个项目,基于paxos算法的分布式服务组件,同事对此有非常深入的研究和介绍,具体可以看我们的团队Blog。令我感慨的是这么一个非常难以理解的算法,却用一个简单的树状目录模型表达出来,并且在这个模型的基础上衍生出种种应用:集群感知、分布式锁、分布式队列、分布式并发原语等等,具体可以看文档给出的recipes。在实现这些应用的时候,突出强调的是避免网络风暴,例如分布式锁的实现,竞争创建子节点,节点序列号最小的获取锁,其他节点等待,但是等待在什么条件上是有讲究的,如果所有节点都等待最小节点的删除事件,那么当最小节点释放锁的时候,就需要广播消息给所有其他等待的节点;换一个思路,如果每个等待节点只是等待比它序列号小的节点上,那么就可以避免这种广播风暴,变成一个顺序唤醒的过程。因此尽管有了zookeeper帮助实现分布式这些服务,但是要实现好仍然有一定难度,具体可以参考官方例子。我本来萌生了基于zookeeper实现一套封装好的类似j.u.c的服务框架,后来在邮件列表发现已经有人搞了这么一个基础类库放在github上:https://github.com/openUtility/menagerie 。不过我没有继续深入了,有兴趣的朋友可以瞧瞧。

    然后又去看了我们淘宝开源的TimeTunnel。TimeTunnel你可以理解成一个消息中间件,它整个设计跟我们的产品相当接近,但是两者的目的完全不同,tt强调的是高吞吐量,而notify强调的则是可靠性。TT的通讯层直接采用Facebook的thrift,并且利用zookeeper做集群管理和路由。TT的代码质量很好,有兴趣可以拉出来看一下,并且对zookeeper的应用也是一个典型的案例。TT在高可用性上的方案也很有特色,所有的服务器节点形成一个环,两两相互主辅备份,一个节点挂了,后续节点仍然可以提供服务直到主节点回来,有点类似一致性哈希的概念。节点的主从关系和顺序也是通过zookeeper保证。消息顺序的实现是通过称为router的路由到固定节点做传输,router默认是策略不是固定而是RR。TT的数据存储优先放在内存,并设置了一个内存状况监视的组件,当发现内存放不下的时候,swap到磁盘文件缓存,实现类似内存换页的功能。正常情况数据都应该在内存,当然如果可靠级别要求高的话可以先存磁盘再传输。TT目前仍然还是比较适合传输日志这样的文本增量数据,并且提供了TailFile这样的python脚本帮你做这个事情,这个脚本可以通过checkpoint做断点续传。在学习这个项目的时候,发现文档有很大问题,要么错误,要么遗漏,并且代码也不是最新的,我估计开源出来外面的人用的还不太多,希望慢慢能搞的更好一些。

    跟TT类似,另一个追求高吞吐量的MQ是linkedin开源的kafka。Kafka就跟这个名字一样,设计非常独特。首先,kafka的开发者们认为不需要在内存里缓存什么数据,操作系统的文件缓存已经足够完善和强大,只要你不搞随机写,顺序读写的性能是非常高效的。kafka的数据只会顺序append,数据的删除策略是累积到一定程度或者超过一定时间再删除。Kafka另一个独特的地方是将消费者信息保存在客户端而不是MQ服务器,这样服务器就不用记录消息的投递过程,每个客户端都自己知道自己下一次应该从什么地方什么位置读取消息,消息的投递过程也是采用客户端主动pull的模型,这样大大减轻了服务器的负担。Kafka还强调减少数据的序列化和拷贝开销,它会将一些消息组织成Message Set做批量存储和发送,并且客户端在pull数据的时候,尽量以zero-copy的方式传输,利用sendfile(对应java里的FileChannel.transferTo/transferFrom)这样的高级IO函数来减少拷贝开销。可见,kafka是一个精心设计,特定于某些应用的MQ系统,这种偏向特定领域的MQ系统我估计会越来越多,垂直化的产品策略值的考虑。

     在此期间,我还重新去看了activemq和hornetq的存储实现,从实现上大家都大同小异,append log + data file的模式。Activemq采用异步队列写来提高吞吐量,而Hornetq干脆就直接利用JNI调用原生aio来实现高性能。在搜索Java的aio实现的时候,碰巧发现Mina的沙箱里有个aioj的实现,源码在:https://svn.apache.org/repos/asf/mina/sandbox/mheath/aioj/ 。我测试了完全可用,也尝试改造我们的磁盘存储组件,可惜提升不多,估计不从整个设计上调整服务器,不大可能从aio上获益。

     最近也重新看起了clojure的一些开源项目,clojure的开源资源在github上也非常丰富,有待挖掘,下次有机会再尝试介绍一二。
  
   
   

posted @ 2011-01-20 23:23 dennis 阅读(7625) | 评论 (11)编辑 收藏

    写着玩的,不使用任何网络框架从头构建的echo server,总共77行。
 1 ;;Author:dennis (killme2008@gmail.com)
 2 (ns webee.network
 3    (:import (java.nio.channels Selector SocketChannel ServerSocketChannel SelectionKey)
 4             (java.net InetSocketAddress)
 5             (java.nio ByteBuffer)
 6             (java.io IOException)))
 7 
 8 (declare reactor process-keys accept-channel read-channel)
 9 
10 (defn bind [^InetSocketAddress addr fcol]
11   (let [selector (Selector/open)
12         ssc      (ServerSocketChannel/open)
13         ag  (agent selector)]
14     (do
15       (.configureBlocking ssc false)
16       (.. ssc (socket) (bind addr 1000))
17       (.register ssc selector SelectionKey/OP_ACCEPT)
18       (send-off ag reactor fcol)
19       ag)))
20 
21 (defn- reactor [^Selector selector fcol]
22   (let [sel (. selector select 1000)]
23     (if (> sel 0)
24       (let [sks (. selector selectedKeys)]
25         (do 
26           (dorun (map (partial process-keys selector fcol) sks))
27           (.clear sks))))
28     (recur selector fcol)))
29   
30 (defn- process-keys [^Selector selector ^SelectionKey fcol sk]
31   (try
32     (cond 
33       (.isAcceptable sk) (accept-channel sk  selector fcol)
34       (.isReadable sk) (read-channel sk selector fcol)    
35     )
36     (catch Throwable e (.printStackTrace e))))
37 
38 (defn- accept-channel [^SelectionKey sk ^Selector selector fcol]
39    (let [^ServerSocketChannel ssc (. sk channel)
40          ^SocketChannel sc (. ssc accept)
41          created-fn (:created fcol)]
42      (do 
43        (.configureBlocking sc false
44        (.register sc selector SelectionKey/OP_READ)
45        (if created-fn
46          (created-fn sc)))))
47 
48 (defn- close-channel [^SelectionKey sk ^SocketChannel sc fcol]
49   (let [closed-fn (:closed fcol)]
50     (do 
51        (.close sc)
52        (.cancel sk)
53        (if closed-fn 
54          (closed-fn sc)))))
55      
56 (defn-  read-channel [^SelectionKey sk ^Selector selector fcol]
57    (let [^SocketChannel sc (. sk channel)
58          ^ByteBuffer buf (ByteBuffer/allocate 4096)
59          read-fn (:read fcol)]
60      (try
61        (let [n (.read sc buf)]
62          (if (< n 0)
63              (close-channel sk sc fcol)
64              (do (.flip buf)
65                  (if read-fn
66                    (read-fn sc buf)))))
67        (catch IOException e
68          (close-channel sk sc fcol)))))
69 
70 ;;Bind a tcp server to localhost at port 8080,you can telnet it.
71 (def server
72   (bind 
73     (new InetSocketAddress 8080)
74     {:read #(.write %1 %2)
75      :created #(println "Accepted from" (.. % (socket) (getRemoteSocketAddress)))
76      :closed  #(println "Disconnected from" (.. % (socket) (getRemoteSocketAddress)))
77      }))


posted @ 2011-01-15 22:56 dennis 阅读(1881) | 评论 (0)编辑 收藏

      Xmemcached是一个开源的memcached的Java客户端,最近引入了一些关键特性,因此版本号直接从1.2.6.2升级到1.3.0。主要的更改如下:

1、引入了failure模式,所谓failure模式是指在当一个memcached由于各种原因不可用的情况下,发往这个节点的请求将直接抛出异常,而非使用下一个可用的节点。具体可以看memached的这个文档。默认不启用,启用failure模式很简单:

MemcachedClientBuilder builder=……
//启用failure模式。
builder.setFailureMode(true);

也可以采用spring配置。

2、在启用failure模式的情况下,允许为每个memcached设置一个备份节点,当主节点挂掉的情况下,会将请求转交给备份节点,主节点恢复后又自动切换到主节点。请注意,要设置备份节点的前提是启用failure模式。假设我们已经有两个memcached节点:host1:port和host2:port,为host1:port设置一个备份节点host3:port可以实现为:
MemcachedClientBuilder builder=new XmemcachedClientBuilder(AddrUtil.getAddressMap("host1:port,host3:port host2:port"))
……

主备节点之间用逗号隔开,不同分组之间用空格隔开,完全兼容1.2。并且当备份节点连接意外断开的情况下,xmemcached也会自动修复备份节点的连接并加入映射。

关于failure模式和standby节点更多内容可以参考这篇blog.

3、修正BUG和新功能,包括issue 104,issue 105,issue 107等。

项目主页 http://code.google.com/p/xmemcached/

下载地址 http://code.google.com/p/xmemcached/downloads/list

用户指南 http://code.google.com/p/xmemcached/wiki/TableOfContents

     如果你使用maven构建,可以直接引用:

<dependency>
<groupId>com.googlecode.xmemcached</groupId>
<artifactId>xmemcached</artifactId>
<version>1.3.1</version>
</dependency>


    更新:发布1.3.1了,如果你还在使用1.3.0,建议升级。1.3.0因为改变了memcached地址服务器顺序,可能导致原有的缓存失效。


posted @ 2011-01-04 20:10 dennis 阅读(2912) | 评论 (0)编辑 收藏


    首先,还是利用下这个小工具,查看下我10年读过的书,看过的电影




    读书:读的太少,也可以看到,技术方面的更少,如果要说推荐,我只会推荐《Programming Clojure》作为学习clojure的入门,并且推荐《构建高性能web站点》作为了解一个网站构建的方放面面的入门书。

    电影:今年进电影院的次数也寥寥无几,主要还是重看了莱昂纳多的作品,《盜梦空间》很惊艳,《钢铁侠2》很失望。

     去年的愿望:读完《算法导论》——2/3,继续深入Erlang,探索Erlang在工作中的实际应用——几乎没有,加强对其他系统的了解以及大型网站构建方面的学习——小小一些了解,希望能全家一起去旅游一次,希望能将老爸老妈接过来玩一段时间——没有做到。
  
     工作:状态并不好,还是尝试努力去做了一些事情,包括参与一些分享,更多参与他人的代码复查和设计审查等。抱怨、牢骚少了一些,相对淡定了。

     2011年:还是不谈大的愿望,从以往的经验来说,很难靠谱。也许有一个相对明晰的目标:提高自制力和计划性。


posted @ 2011-01-01 09:08 dennis 阅读(2074) | 评论 (3)编辑 收藏

仅列出标题
共56页: First 上一页 3 4 5 6 7 8 9 10 11 下一页 Last