放翁(文初)的一亩三分地

  BlogJava :: 首页 :: 新随笔 :: 联系 :: 聚合  :: 管理 ::
  210 随笔 :: 1 文章 :: 320 评论 :: 0 Trackbacks
 

Author:放翁(文初)

Date: 2010/11/23

Email:fangweng@taobao.com

mblog: http://t.sina.com.cn/fangweng

blog: http://blog.csdn.net/cenwenchu79/

         这篇文章将会从问题,技术背景,设计实现,代码范例这些角度去谈基于管道化和事件驱动模型的Web请求处理,其中的一些描述和例子也许不是很恰当,也希望得到更多的反馈。

 

业务架构设计:

         基于上述问题,通过两步走来解决。首先采用支持打破传统http request生命周期管理的Web容器(很多人说可以自己写,其实Web容器写起来并不是最麻烦的,如何做好兼容和照顾好每一个细节才是漫长发展的道路)。其次在容器新的线程生命周期管理基础上封装业务框架,为开发者屏蔽底层异步化和事件驱动模式带来的复杂流程管理内容。

                                                 

Pipe Service Framework

基础管道体系:

         很多时候设计和实现都会有很多细节上的差异,而这些差异往往是在事实验证后对体系的一种修订,也许修订后的结构不如修订前的清晰和优雅,但是确实在性能和结构上找到了平衡点,下面就看看两个基础管道体系的设计,后一个是前一个的演进。

                                                    

流程与角色说明:

         角色分成:Container(传统的容器)dispatcher(任务派发线程数量根据性能要求可以是1-m个),job pool(存储任务数据的本地缓存),event queue(任务状态发生变化的事件存储队列),pipe register center(管道链注册中心,根据job的自描述信息给出相关处理的单个管道或者管道链),thread pool(用于处理业务请求的线程池)

         流程描述如下:

1. 容器解析请求数据。

2. 创建任务并存储到job pool

3. 发送job执行消息到消息队列。

4. 释放容器线程,挂起请求资源。

5. Dispatcher阻塞方式的从event queue获取事件消息。

6. 如果是删除任务事件消息,则将剩余未发送数据flush到客户端,结束本次Http会话。(删除任务消息是在任务走完所有管道或者任务执行超时或者任务执行失败产生)

7. 如果是执行任务消息事件,则从job pool获取任务数据。

8. 根据任务信息去pipe register center获取pipe或者pipe chain

9. 将任务数据和管道信息发送给线程池。

10.              线程池分配线程执行任务,如果当前pipe chain执行后并没有完成job,则将job信息存储到job pool。(这块后面可以参看一下job 执行逻辑图)

11.              如果没有执行完毕,则可以创建一个或者多个执行事件激发下一次的处理,如果执行完毕,则创建一个删除任务消息激发任务结束处理。

问题:

1. 规范化带来的消息事件过多,线程切换消耗的问题。

2. Dispatcher自身任务是否繁重导致处理速度变慢。同时两套线程池管理麻烦(如果Dispatcher的个数为M也就可以看作另一个线程池)。

细节:

1. 利用容器本身支持请求挂起的方式,将容器线程池和业务线程池分割开来。

2. 如果所有子任务都是串行化且没有一个子任务是由外部系统来实施状态迁移,则可以在一个线程中完成所有子任务,减少线程切换和事件分发带来的消耗。最极端是退化到任务交由容器线程一并完成。

3. 当允许并行多个子任务执行时,只需要在并行子任务执行前的那个任务完成后,分发多个任务执行事件,并且任务执行事件指定要求处理的Pipe,就可以让分发器将当前任务分发给多个线程并行执行子任务,后续详细介绍子任务并行处理的过程。

4. Job会被多线程访问,因此必要的属性需要做成线程安全的。另一种模式就是抓取job的数据是个快照(clone),在结果产生后再锁住合并。


                                              


角色和流程说明:

         上图角色将线程池和消息队列做了合并,去掉了dispatcherevent queue合并到了 Thread Pool中。

1. 容器解析请求参数。

2. 创建任务并放置到任务缓存中。

3. 发送执行任务事件到线程池。

4. 释放容器线程资源。

5. 线程池从自身事件队列中获取事件。

6. 如果是删除事件,则直接删除任务,并发送数据到客户端,结束本地会话。

7. 如果是执行事件,则从pipe register center获取pipe或者pipe chain

8. 本地执行pipe或者pipe chain

9. 更新job 数据到缓存。

10.              创建执行或者删除消息事件到本地线程池队列或者直接连续执行。

差异:

1. 将分发器的功能散落到各个实际业务操作线程上,提升处理效率。(增加了对于消息队列的竞争,不过这个代价不是很大)

2. 线程可以连续执行子任务,减少任务事件数量,减少线程切换代价。(类似于自旋锁的方式,自己可以尽量的完成可以完成的任务,带来的问题就是对于不同任务多阶段并行执行的策略有所减弱)

细节:

         和第一种模式一样,可以退化这个模型到传统的一个web容器线程处理所有的子任务,减少线程切换代价。

四种方式的子任务执行说明:

                                            

传统的串行化任务执行模式,这种模式下可以交由单个线程全部执行,减少线程切换代价,另一方面假如3这个环节将会等待外部系统来更新状态并继续执行,那么到2执行完毕可以将job放入缓冲区,不产生事件消息,等外部操作完成后,创建执行事件消息,激发后续管道执行任务。(这种方式可以直接利用容器的挂起,来释放容器线程,而后续操作交由后台业务线程池执行)

这里有点说明一下,也是很多朋友问起的,关于上下文,原来的模式中上下文一种方式是通过方法参数不断传递,另一种方式保存在ThreadLocal中,而现在因为要切换线程可能就需要做拷贝或者线程之间传递。在后面几种模式中都建议直接将状态存储在本地缓存中共享,带来的问题就是多线程安全,一种方式是都获取此对象,然后操作时候做锁,一种是获得对象快照,然后合并结果时锁定。(这还是取决于多个线程之间处理是否需要看到对方的数据变化)

                                                 

         34两个任务可以并行完成,同时任何一个完成即可进入5,此时在2完成后,将会产生两个执行任务消息,并且自描述后续的Pipe,此时两个线程可以分别执行34,任何一个完毕后创建执行消息,激发任务处理进入到5流程中。(当发现已经进入5状态时,则忽略某个过期任务消息)

                                                   

         与上一个图的区别就是,34将不再是二选一,而是必须全执行完毕后才可以进入下一个阶段,因此job在执行后会先判断是否被并行的另一个任务执行过,确定全部都Ready,则发起创建执行消息。(在完成3或者4后都会判断当前合并结果是否符合进入下一环节的要求,符合再发起新的执行任务消息)

                                               

         此图是23两种方案的结合,因此参照3的做法完成。

支持异步化请求处理模型:

         上面的管道模型是较为通用的模型,但考虑到TOP现有业务状况和资源消耗在上述框架下定制了简单的异步支持模型:

                                                      

角色及流程说明:

         App第三方ISV软件,Container Web容器,PipeManager管道注册管理者(区别于通用的管道注册中心在于他对于所有请求都只管理一套Pipe Chain,由他将请求数据传入,并管理整个子任务的执行和分发),AsynTaskChecker是异步执行任务状态变更事件的检查者(类似于前面的事件分发器角色),ResultQueue保存事件及事件所带的上下文,workerThead是工作线程池。

1. 应用发起服务请求。

2. 容器调用管道管理器去执行任务管道链。(解析参数通过Lazy方式解析字节流被离散放到了各个管道环节中)

3. 检查容器是否对异步支持。(便于多容器兼容)

4. 创建上下文和输入输出对象(输入输出是管道基本传递参数,后面给出类图结构可知,上下文则是放置在ThreadLocal的数据,在多个管道逻辑中共享)。

5. 设置管道链执行的起始点(为了异步化后再次进入管道链无需重新执行前面执行过的管道作处理)。

6. 循环执行管道链。

如有异步管道在管道链中:

a)         复制管道上下文,保存当前执行的管道位置。

b)         挂起请求,释放容器线程资源。

c)         创建线程执行异步化管道。

d)         保存任务到队列,等待外部处理结束改变任务状态。

e)         推出循环执行后续管道

7. 判断是否是异步执行后的重入,如果是则提交异步结束事件,让容器在这次管道链执行后自动提交数据到客户端,结束本地Http请求会话。

8. 释放上下文等线程本地资源。

9. 返回容器,容器判断是否有挂起请求,如果请求结束则返回结果到客户端。

10.              容器自检查从挂起到当前是否处于执行超时(每次挂起请求就会产生一个超时事件,容器循环的校验这些事件)

11.              AsynTaskChecker循环的检查队列中的任务是否已经完成,如果状态变更为完成,则提交到给线程池继续执行后续的管道链。(处于性能考虑,可以将未完成的对象先不放入队列,等到后端服务处理完毕再放入,这样AsynTaskChecker消耗会大大降低,任务超时完全交给容器来处理,不由业务方来处理

细节:

         主要目的是将容器和业务线程池分开,这样业务线程池可以采用后面提到的权重线程池,通过对权重线程池的权重模型设置来满足根据业务或者根据服务健康状况来不均衡的分配线程执行不同的业务请求。

         后端系统的NIO异步方式能够利用操作系统的中断来激发改变对象状态,节省前端业务线程等待消耗。(如果后端是非异步化的操作,那么执行线程只是从容器线程变为了业务线程,当然可以让业务线程更加轻量)

         系统中尽量减少线程切换(能够一个线程干完的,尽量一个线程执行多个子任务),尽量减少内存拷贝复用对象(当然复用的代价就是同步问题,因此取决于数据操作冲突的概率选择使用快照还是引用)。


                                                

         上图的设计省略了队列和检查者,直接交由业务线程阻塞方式等待返回,并直接执行后续的管道,其实也就是对第一种场景的简化,在后端服务非异步方式的情况下,推荐这种方式。

总的来说,任务切割执行在设计上会觉得很清晰,但是还是要看整体处理时间的分布,如果整个事务处理消耗的时间很短,那么切割带来的复杂度和内部消耗就会得不偿失,采用简单的方式来实现可以满足业务上的需求(分离容器和业务线程,根据业务需求和系统动态性能决定线程资源分配),也能保证性能。

权重线程池:

         将请求全程处理从容器线程池分离到业务线程池后,可以使用带权重的线程池来动态调整请求线程资源分配,下面是一个简单的权重线程池的实现。

目标:执行的任务实现接口getkey来用于判断是否有空余线程可以执行请求处理任务。资源被分成两种:默认全局可使用资源,给特定请求预留资源。配置分成两种,限制最大使用线程数,预留特定请求的线程数。

                                                

         上图是简单的请求任务执行流程图,不多解释了。下图是状态转换图:

                                                

         Waitdoing的转换和initdoing的转换一样,就没有重复画了。内部的一些标识解释(totalCounter全局的计数器,maxThreadPoolSize线程池最大线程数,defaultCounter是没有设置预留或者限制的请求的计数器,defaultThresholdmaxThreadPoolSize – sum(预留线程)keyCounter表示设置了预留或者限制的请求自身标识(自身标识通过getkey接口获得)计数器,leave表示某一类请求设置的预留的数值,limit表示某一类请求设置的限制的数值)

上图中大括号中的是场景描述,例如:{Limit Mode}keyCounter <= limit && defaultCounter <= defaultThreshold表示在设置了限制模式的场景下符合当前请求类型计数器(当前请求类型通过请求实现getkey接口返回数据来区别)小于限制且默认计数器小于默认阀值时状态转变。

一点小技巧:在存储预留和限制的阀值时,因为存储在一个map中,通过将阀值设置为负数来区分开,这样节省了区分阀值类型的工作。(这点可以在很多场景中考虑,比如说有多个类型的数据配置需要存储,可以通过数据区间的划分来判断是什么类型的,提高判断效率)

Comet Push Framework:

         服务端实现:这期做了很简单的服务端实现,也是为了验证原型,标准的REST实现。

                                                

         POST操作,用于新增资源,操作后得到资源返回,会话非长连接。


                                                      

         GET操作,获得当前请求的资源,会被加入到资源关注者列表中,保持长连接,用于资源变更后推送变更后的资源对象。

                                                   

         PUT或者Delete操作,短链接,同时产生变化事件,交由后台线程执行通知动作。

                                                    

         批量执行通知消息。

1. ResourceBoard阻塞式的从队列中获取事件通知。

2. 创建临时事件存储Map

3. 如果存在通知事件,判断是否属于删除事件(此类事件发生在异常发生或者正常结束),如果是删除事件,立刻提交给后台线程池执行删除动作。(删除动作就是获取删除资源的follow列表,然后关闭所有follow的长连接)

4. 如果属于修改事件,判断当前资源的删除事件是否已经保存在临时存储Map中,如果有就不再加入修改事件直接忽略,否则就放入Map

5. 判断当前循环累积事件是否超过一定时间或者存储的消息量已经超过一定值,如果是就跳出循环,如果否,则继续从队列中获取数据循环判断,直到队列为空。

6. 批量执行临时存储中的事件消息,如果是修改,则获取资源的follows来推送变更后的数据。

细节:

    内部对于follow的有效性管理是在发送数据时判断的,如果出错就会产生删除事件。

对于消息批量处理主要是针对数据不断被修改,合并这些无用消息而作,但是某些场景也许就需要所有的修改痕迹,那就不能简单合并,因此资源需要提供类似合并的接口实现来保证获取的正确性。

         问题:

       海量长连接的支持。

       采用简单的Http InnerFrame + js实现客户端增量展现会使得页面数据越来越多,到一定程度需要放弃连接重新建立follow,减轻客户端和服务端双重压力。XHR的方式在各种浏览器中支持的不一致。


     代码实现,
Demo及测试效果

待续….

posted on 2010-11-25 14:44 岑文初 阅读(4092) 评论(7)  编辑  收藏

评论

# re: 基于管道化和事件驱动模型的Web请求处理(二) 2010-11-25 15:11 BucketLI
我主要有3个问题
1.并行任务执行的线程池是自己实现的还是使用Concurrent包的线程池?然后线程池的饱和拒绝策略采用的是什么策略?因为Concurrent包中的线程池如果不扩展没有如同数据库连接池的等待超时策略,一旦满了就立刻执行饱和拒绝策略里面的行为,所以你实现的线程池(如果是的话)饱和拒绝策略是否加入了超时特性,或者干脆使用主线程执行?

2.并行执行任务结果需要同步合并的场景中,如果其中某个并行任务失败,是强行取消其他正常任务,并且回收资源,还是让其执行完毕?

3.另外通过不断轮询队列中的任务是否完成与通过CountDownLatch的通知,哪种会比较强?至少后者我需要维护闭锁在异常情景下的行为,否则会导致内存泄露,感觉比较复杂(这是我们一个场景实现的方式).  回复  更多评论
  

# re: 基于管道化和事件驱动模型的Web请求处理(二) 2010-11-25 20:04 岑文初
@BucketLI
1.有权重的的线程池是基于concurrent上封装的,自己尝试过封装,性能差不多,就没什么意思了,现在配置maxpoolsize和minpoolsize是一样大的,这样当线程池到了上限后就放入队列,队列满了就丢弃,如果需要加入超时特性,其实就把线程池自带的队列中的数据封装带有时间戳,然后定时轮询清理掉过期任务。或者采用时间槽的概念,比如分成6个时间槽,每隔10分钟切换一个时间槽,那么当你任务20分钟是容忍极限的话,那么在每次切换时间槽的时候清除相隔2个位置的队列。
2.这个还是看你自己的策略,简单来说,如果一个必选步骤错了,就产生结束任务的事件,有后台线程执行删除任务操作。
3.CountDownLatch你是用线程阻塞?为每一个任务都分配一个?我不是很清楚你的做法,但是个人感觉CountDownLatch用于任务的join启动和结束比较好,你也可以描述一下你们的做法?  回复  更多评论
  

# re: 基于管道化和事件驱动模型的Web请求处理(二) 2010-11-26 09:41 BucketLI
@岑文初
1.这个问题主要在压力测试的时候发现的,比如并行的任务线程池大小为10,队列为20,也就是同时能够提交的任务最大30个,后面SUBMIT或者EXECUTE被拒绝(直接拒绝策略),这确实是符合要求的,但有一个问题是,假设一次请求生成5个左右的并行任务,只要其中一个任务失败就这次请求失败,因为线程池一直处于忙碌状态,所以这5个任务很有可能被部分拒绝,实际上是数量>=2的并行任务都有可能被部分拒绝导致这次请求失败. 所以我的策略改成主线程来执行了,但我认为有个等待超时会更好.

3.因为主线程得收集到并行任务执行完毕的所有结果才能进行下一步操作(也就是屏障),有N个并行任务,我就实现一个CountDown N次的闭锁,然后主线程提交完任务后await,任务持有这个闭锁,执行完任务就 CountDown一下,直到CountDown完毕.异常处理是线程池任务持有主线程实例,发生异常的时候interrupt主线程,主线程响应这个中断异常,cancel其他任务以及清理资源.  回复  更多评论
  

# re: 基于管道化和事件驱动模型的Web请求处理(二) 2010-11-26 18:33 anderslin
后端异步执行部分感觉太麻烦了,不知道是否有考虑过采用spring batch作为后端执行方案?  回复  更多评论
  

# re: 基于管道化和事件驱动模型的Web请求处理(二) 2010-11-29 19:09 单飞
有点像工作流,干吗不用jbpm解决流程呢?  回复  更多评论
  

# re: 基于管道化和事件驱动模型的Web请求处理(二) 2010-12-01 09:55 darren
请问你的sequence图,用那个uml工具画的,rose没法画循环,你的可以。  回复  更多评论
  

# re: 基于管道化和事件驱动模型的Web请求处理(二) 2010-12-01 21:00 岑文初
EA@darren
  回复  更多评论
  


只有注册用户登录后才能发表评论。


网站导航: