庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理

    最近有朋友给我邮件问一些storm的问题,集中解答在这里。
一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算?

你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算。怎么实现spout可以参考官方的kestrel spout实现:
https://github.com/nathanmarz/storm-kestrel

如果你的数据源不支持事务性消费,那么就无法得到storm提供的可靠处理的保证,也没必要实现ISpout接口中的ack和fail方法。

二、Storm为了保证tuple的可靠处理,需要保存tuple信息,这会不会导致内存OOM?

Storm为了保证tuple的可靠处理,acker会保存该节点创建的tuple id的xor值,这称为ack value,那么每ack一次,就将tuple id和ack value做异或(xor)。当所有产生的tuple都被ack的时候, ack value一定为0。这是个很简单的策略,对于每一个tuple也只要占用约20个字节的内存。对于100万tuple,也才20M左右。关于可靠处理看这个:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

三、Storm计算后的结果保存在哪里?可以保存在外部存储吗?

Storm不处理计算结果的保存,这是应用代码需要负责的事情,如果数据不大,你可以简单地保存在内存里,也可以每次都更新数据库,也可以采用NoSQL存储。storm并没有像s4那样提供一个Persist API,根据时间或者容量来做存储输出。这部分事情完全交给用户。

数据存储之后的展现,也是你需要自己处理的,storm UI只提供对topology的监控和统计。

四、Storm怎么处理重复的tuple?

因为Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并重新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:
(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后续的批处理计算会更正实时计算的误差。
(2)使用第三方集中存储来过滤,比如利用mysql,memcached或者redis根据逻辑主键来去重。
(3)使用bloom filter做过滤,简单高效。

五、Storm的动态增删节点

我在storm和s4里比较里谈到的动态增删节点,是指storm可以动态地添加和减少supervisor节点。对于减少节点来说,被移除的supervisor上的worker会被nimbus重新负载均衡到其他supervisor节点上。在storm 0.6.1以前的版本,增加supervisor节点不会影响现有的topology,也就是现有的topology不会重新负载均衡到新的节点上,在扩展集群的时候很不方便,需要重新提交topology。因此我在storm的邮件列表里提了这个问题,storm的开发者nathanmarz创建了一个issue 54并在0.6.1提供了rebalance命令来让正在运行的topology重新负载均衡,具体见:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的变更:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

storm并不提供机制来动态调整worker和task数目。

六、Storm UI里spout统计的complete latency的具体含义是什么?为什么emit的数目会是acked的两倍?
这个事实上是storm邮件列表里的一个问题。Storm作者marz的解答:
The complete latency is the time from the spout emitting a tuple to that
tuple being acked on the spout
. So it tracks the time 
for the whole tuple
tree to be processed.

If you dive into the spout component in the UI, you
'll see that a lot of
the emitted/transferred is on the __ack* stream. This is the spout
communicating with the ackers which take care of tracking the tuple trees.


简单地说,complete latency表示了tuple从emit到被acked经过的时间,可以认为是tuple以及该tuple的后续子孙(形成一棵树)整个处理时间。其次spout的emit和transfered还统计了spout和acker之间内部的通信信息,比如对于可靠处理的spout来说,会在emit的时候同时发送一个_ack_init给acker,记录tuple id到task id的映射,以便ack的时候能找到正确的acker task。

posted @ 2011-12-19 15:25 dennis 阅读(14896) | 评论 (9)编辑 收藏

    原文:http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html
    作者:dennis (killme2008@gmail.com)
    转载请注明出处。

    最近一直在读twitter开源的这个分布式流计算框架——storm的源码,还是有必要记录下一些比较有意思的地方。我按照storm的主要概念进行组织,并且只分析我关注的东西,因此称之为浅析。       

一、介绍
    Storm的开发语言主要是Java和Clojure,其中Java定义骨架,而Clojure编写核心逻辑。源码统计结果:
     180 text files.
     
177 unique files.                                          
       
7 files ignored.

http:
//cloc.sourceforge.net v 1.55  T=1.0 s (171.0 files/s, 46869.0 lines/s)
-------------------------------------------------------------------------------
Language                     files          blank        comment           code
-------------------------------------------------------------------------------
Java                           
125           5010           2414          25661
Lisp                            
33            732            283           4871
Python                           
7            742            433           4675
CSS                              
1             12             45           1837
ruby                             
2             22              0            104
Bourne Shell                     
1              0              0              6
Javascript                       
2              1             15              6
-------------------------------------------------------------------------------
SUM:                           
171           6519           3190          37160
-------------------------------------------------------------------------------

    Java代码25000多行,而Clojure(Lisp)只有4871行,说语言不重要再次证明是扯淡。
        
二、Topology和Nimbus       
    Topology是storm的核心理念,将spout和bolt组织成一个topology,运行在storm集群里,完成实时分析和计算的任务。这里我主要想介绍下topology部署到storm集群的大概过程。提交一个topology任务到Storm集群是通过StormSubmitter.submitTopology方法提交:
StormSubmitter.submitTopology(name, conf, builder.createTopology());
    我们将topology打成jar包后,利用bin/storm这个python脚本,执行如下命令:
bin/storm jar xxxx.jar com.taobao.MyTopology args
    将jar包提交给storm集群。storm脚本会启动JVM执行Topology的main方法,执行submitTopology的过程。而submitTopology会将jar文件上传到nimbus,上传是通过socket传输。在storm这个python脚本的jar方法里可以看到:
def jar(jarfile, klass, *args):                                                                                                                               
   exec_storm_class(                                                                                                                                          
        klass,                                                                                                                                                
        jvmtype
="-client",                                                                                                                                    
        extrajars
=[jarfile, CONF_DIR, STORM_DIR + "/bin"],                                                                                                    
        args
=args,                                                                                                                                            
        prefix
="export STORM_JAR=" + jarfile + ";")
     将jar文件的地址设置为环境变量STORM_JAR,这个环境变量在执行submitTopology的时候用到:
//StormSubmitter.java 
private static void submitJar(Map conf) {
        
if(submittedJar==null) {
            LOG.info(
"Jar not uploaded to master yet. Submitting jar");
            String localJar 
= System.getenv("STORM_JAR");
            submittedJar 
= submitJar(conf, localJar);
        } 
else {
            LOG.info(
"Jar already uploaded to master. Not submitting jar.");
        }
    }
    通过环境变量找到jar包的地址,然后上传。利用环境变量传参是个小技巧。

    其次,nimbus在接收到jar文件后,存放到数据目录的inbox目录,nimbus数据目录的结构
-nimbus
     
-inbox
         
-stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar
         
-stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar

     
-stormdist
        
-storm-id
           
-stormjar.jar
           
-stormconf.ser
           
-stormcode.ser
     其中inbox用于存放提交的jar文件,每个jar文件都重命名为stormjar加上一个32位的UUID。而stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。
    进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
4.开始分配任务(内部称为assignment), 具体步骤:
 (1)从zk上获得已有的assignment(新的toplogy当然没有了)
 (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
 (3)将任务均匀地分配给可用的worker,这里有两种情况:
 (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配
{1: [host1:port1] 2 : [host2:port1]
         
3 : [host1:port1] 4 : [host2:port1]}
,可以看到任务平均地分配在两个worker上。
(b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成
[host1:port1 host2:port1 host1:port2 host2:port2]
,然后分配任务为
{1: host1:port1 , 2 : host2:port2}

(4)记录启动时间
(5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。
6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。

posted @ 2011-12-01 21:48 dennis 阅读(15182) | 评论 (10)编辑 收藏


    所谓兵马未动,粮草先行,准备将storm用在某个项目中做实时数据分析。无论任何系统,一定要有监控系统并存,当故障发生的时候你能第一个知道,而不是让别人告诉你,那处理故障就很被动了。

    因此我写了这么个项目,取名叫storm-monitor,放在了github上

     https://github.com/killme2008/storm-monitor

    主要功能如下:
1.监控supervisor数目是否正确,当supervisor挂掉的时候会发送警告。
2.监控nimbus是否正常运行,monitor会尝试连接nimbus,如果连接失败就认为nimbus挂掉。
3.监控topology是否正常运行,包括它是否正常部署,是否有运行中的任务。

    当故障发生的时候通过alarm方法警告用户,开放出去的只是简单地打日志。因为每个公司的告警接口不一样,所以你需要自己扩展,修改alarm.clj即可。我们这儿就支持旺旺告警和手机短信告警。

    基本的原理很简单,对supervisor和topology的监控是通过zookeeper来间接地监控,通过定期查看path是否存在。对nimbus的监控是每次起一个短连接连上去,连不上去即认为挂掉。

    整个项目也是用clojure写。你的机器需要安装leinexec插件,然后将你的storm.yaml拷贝到conf目录下,编辑monitor.yaml设定监控参数如检查间隔等,最后启动start.sh脚本即可。默认日志输出在logs/monitor.log。

posted @ 2011-12-01 21:02 dennis 阅读(10495) | 评论 (0)编辑 收藏


    在豆瓣发了一些牢骚,索性多说一些我个人对人对事的偏见,既然是偏见,就不会让人舒服,事先声明是扯淡,不想浪费时间的人略过。

1.我们要远离新浪微博,新浪微博跟twitter不一样,twitter是为了让每个人的信息的更好更快地传播而设计的,而新浪微博是为了让权威的声音更好更快地传播而设计的。迷恋上新浪微博,你要么是权威,要么是跟随权威。成为权威的,免不了沾沾自喜,真以为自己成了“权威”。更可怕的是你不可避免地要生活在相互吹捧和喧嚣中。

2.在编写代码之外,我们可能需要更多的手艺傍身,例如木匠或者厨师,以免在乱世的时候因为不需要程序员而饿死。ps.计算弹道轨迹的程序员除外。

3.据说真正的牛人从不跳槽,作为大多数不是牛人,以及已经远离牛人行列的我们(跳槽超过3次以上),跳槽仍然是你提升自己的有效途径,无论是薪水还是技术。

4.写简历的技巧,我慢慢领悟到了,少点技术术语,多点成效和应用,打动了HR过了第一关之后,再去跟技术人员扯淡。

5.简历要定时更新,你可以理解成定时提醒下猎头和HR,关注我啊,关注我啊。

6.强烈地拥抱文本化,配置文本化(没人会脑残地用二进制当配置文件吧?),协议文本化,婚姻文本化。

7.一切不以加薪为目的的挽留,都是耍流氓,这不是我的原创。

8.有趣比实用重要,没趣味的东西,给钱也不去做(好吧,我说假话)。

9.对新潮的东西保持一点警惕,如果这个东西三个月后还有人在谈论,那可以关注下

10.代码永远比文档、博客真实和靠谱,阅读代码习惯了,跟阅读文档没啥区别。

11.少关注博客和新闻,戒掉看google reader的习惯。现在更多地看maillist上的讨论和问题,真正重要的东西你永远不会错过。

12.不追求完美,等你完美的时候别人已经是事实标准。

13.大型的技术聚会不是为技术人员准备的,这是大公司给员工的度假福利和领导们的吹水时间。只有在小型的技术聚会上才能看到一些有价值的东西,任何稍微跟商业沾一点边的几乎都没有太大价值,我说的是国内。

14.80%的分享都只对演讲者有益,该sb的还是sb,该牛b的还是牛b。最有效的分享是结对编程和结对review。分享和培训最大的意义是让行政们觉的自己的存在价值很大。

15.国内翻译国外经典>国内原创精品>国外原版,这个原则对英语好的人除外。

16.极其讨厌要求强制缩进的语言,比如python。

17.标榜是一种人生态度,装B装久了你就真牛B了。

18.凭啥不造轮子,你们造轮子舒坦了,爽快了,就不让别人造了。我造轮子我快乐。

19.偏见不全是坏事,坏的是不愿意改变偏见。


    扯淡时间结束。

posted @ 2011-11-30 22:50 dennis 阅读(4434) | 评论 (14)编辑 收藏

今天看到的一个演示TCP慢启动和滑动窗口机制的动画,很形象
osischool.com

posted @ 2011-11-16 07:34 dennis 阅读(6531) | 评论 (3)编辑 收藏

Items\Projects
Yahoo! s4
Twitter Storm
协议
Apache license 2.0
Eclipse Public License 1.0
开发语言
Java
Clojure,Java,Clojure编写了核心代码
结构
去中心化的对等结构
有中心节点nimbus,但非关键
通信
可插拔的通讯层,目前是基于UDP的实现 基于facebook开源的thrift框架
事件/Stream
<K,A>序列,用户可自定义事件类 提供Tuple类,用户不可自定义事件类,
但是可以命名field和注册序列化器
处理单元 Processing Elements,内置PE处理
count,join和aggregate等常见任务
Bolt,没有内置任务,提供IBasicBolt处理
自动ack
第三方交互
提供API,Client Adapter/Driver,第三方客户端输入或者输出事件 定义Spout用于产生Stream,没有标准输出API
持久化 提供Persist API规范,可根据频率或者次数做
持久化
无特定API,用户可自行选择处理
可靠处理 无,可能会丢失事件 提供对事件处理的可靠保证(可选)
路由EventType + Keyed attribute + value匹配
内置count,join和aggregate标准任务
Stream Groupings:
Shuffle,Fields,All,Global,None,Direct
非常灵活的路由方式
多语言支持 暂时只支持Java多语言支持良好,本身支持Java,Clojure,
其他非JVM语言通过thrift和进程间通讯
Failover
 部分支持,数据无法failover 部分支持,数据同样无法failover
Load Balance
不支持 不支持
 并行处理 取决于节点数目,不可调节 可配置worker和task数目,storm会尽量将worker和task均匀分布
动态增删节点不支持
 支持
动态部署
 不支持 支持
web管理 不支持 支持
代码成熟度 半成品 成熟
活跃度 低 活跃
编程 编程 + XML配置
  纯编程
参考文档 http://docs.s4.io/https://github.com/nathanmarz/storm/wiki/
http://xumingming.sinaapp.com/category/storm/ (非常好的中文翻译)

posted @ 2011-11-08 22:25 dennis 阅读(13143) | 评论 (14)编辑 收藏

clj-xmemcached

    Clj-xmemcached is an opensource memcached client for clojure wrapping xmemcached. Xmemcached is an opensource high performance memcached client for java.

Leiningen Usage

To include clj-xmemcached,add:

     [clj-xmemcached "0.1.1"]

to your project.clj.

Usage

Create a client

(use [clj-xmemcached.core])
(def client (xmemcached 
"host:port"))
(def client (xmemcached 
"host1:port1 host2:port2" :protocol "binary"))

Then we create a memcached client using binary protocol to talk with memcached servers host1:port1 and host2:port2. Valid options including:

  :name       Client's name
  :protocol  Protocol to talk with memcached,a string value in text,binary or kestrel,default is text protocol.
  :hash          Hash algorithm,a string value in consistent or standard,default is standard hash.
  :timeout    Operation timeout in milliseconds,default is five seconds.
  :pool          Connection pool size,default is one.

Store items


(xset client "key" "dennis")
(xset client 
"key" "dennis" 100)
(xappend client 
"key" " zhuang")
(xprepend client 
"key" "hello,")

The value 100 is the expire time for the item in seconds.Store functions include xset,xadd,xreplace,xappend and xprepend.Please use doc to print documentation for these functions.

Get items

(xget client "key")
(xget client 
"key1" "key2" "key3")
(xgets client 
"key")

xgets returns a value including a cas value,for example:

  {:value "hello,dennis zhuang", :class net.rubyeye.xmemcached.GetsResponse, :cas 396}

And bulk get returns a HashMap contains existent items.

Increase/Decrease numbers


(xincr client "num" 1)
(xdecr client 
"num" 1)
(xincr client 
"num" 1 0)

Above codes try to increase/decrease a number in memcached with key "num",and if the item is not exists,then set it to zero.

Delete items

(xdelete client "num")

Compare and set

(xcas client "key" inc)

We use inc function to increase the current value in memcached and try to compare and set it at most Integer.MAX_VALUE times. xcas can be called as:

 (xcas client key cas-fn max-times)

The cas-fn is a function to return a new value,set the new value to

(cas-fn current-value)

Shutdown

(xshutdown client)

Flush

(xflush client)
(xflush client (InetSocketAddress. host port))

Statistics

(xstats client)

Example

Please see the example code in example/demo.clj

License

Copyright (C) 2011-2014 dennis zhuang[killme2008@gmail.com]

Distributed under the Eclipse Public License, the same as Clojure.

posted @ 2011-10-30 13:03 dennis 阅读(3155) | 评论 (0)编辑 收藏


    将自己在googlecode和github上的所有项目过了一遍,整理一张列表,列下一些还有点价值和用处的项目,都不是什么great job,纯粹是为了工作需要或者乐趣写的东西,看官要是有兴趣也可以瞧瞧。

 一 Java相关

1.Xmemcached,还算是比较多人使用的一个java memcached client,优点是效率和易用性,缺点是代码写的不怎么样,两年前发展到现在的东西,以后还会继续维护。

2.HS4J,看handlersocket的时候顺手写的客户端,我们公司内部某些项目在用,可能还有其他公司外的朋友在用,后来同事聚石贡献了一个扩展项目hs4j-kit,更易于使用,他写的代码很优雅漂亮,推荐一看。暂时没有精力维护。

3.Aviator,一个很初级的表达式执行引擎,行家看到肯定要笑话我。不过语法上很符合我自己的口味,我们自己的项目在用,也有几个朋友在用,会继续维护。

4.Jevent,一个玩具,其实是模仿libevent的一个java实现,对nio或者libevent的实现机制感兴趣的还可以看看。

5. Kilim,我fork的kilim实现,修改了nio调度器,使用多个reactor做调度效率更高,并添加了一个HttpClient的实现。

二 Android项目

学习android完全是玩票性质,有3个项目,对初学android开发的可能有点参考价值。

1.WhetherWeather,一个天气预报和告警的widget插件,UI太丑了。
2.UniqRecorder,写来记录儿子体重变化的小工具,可以自定义项目和生成曲线图,我自己还在用。
3.UniqTask,最近写的杀进程工具,绝对轻量级,没广告,也是我自己在用。

三 Clojure项目

1.cscheme,一个用clojure实现的scheme解释器,基于sicp这本书的解释器实现。
2.clojure-control,类似node-control的分布式部署和管理的DSL实现,挺好玩的,也有朋友在用,我自己还用不上,sunny有写了个很方便的lein插件node-control

clojure还写了一堆烂尾项目,就不拿出来恶心人了。

四 其他

1.node-zk-browser,一个展现和管理zookeeper的web应用,我们自己在用,基于node.js实现。
2.erlwsh,一个erlang的web shell实现,可以在浏览器里做erlang编程,被一些开源项目比如membase用到了。

    写这些东西对我自己最有好处,如果能顺便给他人带来好处,那是额外的好处。最近正处于我自己一生中也许是最大的转折关头,不能更新blog了,最后,祈求诸天神佛能带来奇迹。

posted @ 2011-10-09 20:23 dennis 阅读(6544) | 评论 (15)编辑 收藏


    xmemcached紧急发布1.3.5版本,主要是修复两个相对严重的bug:

Issue 154: 在重连本地memcached的时候,有可能出现重连无法成功的情况,导致连接丢失,详情见这里
Issue 155: 重连导致文件句柄数超过限制的bug,这是由于重连失败情况下没有合理关闭socket引起的,详情见这里

    如果你使用maven,简单升级版本即可:
 <dependency>
      
<groupId>com.googlecode.xmemcached</groupId>
      
<artifactId>xmemcached</artifactId>
      
<version>1.3.5</version>
 
</dependency>


    下载地址:http://code.google.com/p/xmemcached/downloads/list

    此版本推荐升级,最后感谢两位老外开发者的帮助: ilkinulasMrRubato

posted @ 2011-10-01 15:11 dennis 阅读(4165) | 评论 (6)编辑 收藏

    好吧,我知道现在是凌晨4点了,写完这个就睡觉。

    我一直很不爽android的ES任务管理器,它的广告设置的地方非常恶心,就放在kill键的下面,而且每次都突然跳出来,让你很容易错误点击。我很佩服他们能想出这种提高点击率的办法,但是又无比鄙视这种做法。今天(哦,不是昨天)晚上在twitter上说了,我想自己写个任务管理器,类似ES任务管理器,并且没有广告。那好吧,说干就干,奋斗了一个晚上,终于搞出了成果,这就是隆重登场的UniqTask,先看看运行时截图:



    这是运行在我的GS2上的截图。

    UniqTask的功能跟ES任务管理器的功能完全一致,可以记录kill的历史,每次启动UniqTask的时候自动标记过去kill过的进程。但是UniqTask完全绿色无毒,绝对没有广告,咔咔。

    许久没写android程序,拿起手来不是很顺利,折腾到现在才搞定,我将代码放到了github上,也提供了APK下载,非常欢迎试用啊。

    源码地址:
    https://github.com/killme2008/UniqTask
    APK下载:
    https://github.com/killme2008/UniqTask/blob/master/UniqTask.apk

    白天还有重要的事情要处理,睡觉去了。

posted @ 2011-09-20 04:10 dennis 阅读(3083) | 评论 (5)编辑 收藏

仅列出标题
共56页: 上一页 1 2 3 4 5 6 7 8 9 下一页 Last