2009年6月11日
开源的Java Memcached Client——Xmemcached 发布1.2.4版本,这个版本主要的工作是BUG修正,主要改动如下:
1、修正bug,包括issue 68,issue 74。Issue 68修复后,现在可以正常地使用TokyoTyrantTranscoder来连接TokyoTyrant。
2、为修正的BUG添加新的单元测试。
3、将CachedData.MAX_VALUE修改为可修改状态,允许用户设置更大的值,这个值决定了可以向memcached存储的最大值,默认是1M(通过memcached的-I size选项),单位是字节:
CachedData.MAX_SIZE
=
2
*
1024
*
1024
;
//
修改为2M
4、更正用户指南的错误并补充部分资料。
下载地址: http://code.google.com/p/xmemcached/downloads/list
项目主页:http://code.google.com/p/xmemcached/
Wiki页 :http://code.google.com/p/xmemcached/w/list
Ruby Fiber指南(一)基础
Ruby Fiber指南(二)参数传递
Ruby Fiber指南(三)过滤器 Ruby Fiber指南(四)迭代器 上一节介绍了利用Fiber实现类unix管道风格的过滤链,这一节将介绍利用Fiber来实现迭代器,
我们可以将循环的迭代器看作生产者-消费者模式的特殊的例子。迭代函数产生值给循环体消费。所以可以使用Fiber来实现迭代器。协程的一个关键特征是它可以不断颠倒调用者与被调用者之间的关系,这样我们毫无顾虑的使用它实现一个迭代器,而不用保存迭代函数返回的状态,也就是说无需在迭代函数中保存状态,状态的保存和恢复交由Fiber自动管理。
这一节的介绍以一个例子贯穿前后,我们将不断演化这个例子,直到得到一个比较优雅的可重用的代码,这个例子就是求数组的全排列。如数组[1,2,3]的全排列包括6种排列:
2 3 1
3 2 1
3 1 2
1 3 2
2 1 3
1 2 3
全排列的递归算法实现很简单,我们用Ruby实现如下:
#全排列的递归实现
def permgen (a, n)
if n == 0 then
printResult(a)
else
n.times do |i|
a[n-1], a[i] = a[i], a[n-1]
permgen(a, n - 1)
a[n-1], a[i] = a[i], a[n-1]
end
end
end
def printResult (a)
puts a.join(" ")
end
permgen([1,2,3,4],4)
算法的思路是这样:将数组中的每一个元素放到最后,依次递归生成所有剩余元素的排列,没完成一个排列就打印出来。很显然,这里有消费者和生产者的关系存在,生产者负责产生排列,消费者负责打印任务,整个程序由消费者驱动,因此用Fiber改写如下:
第一步,将打印任务修改为Fiber#yield,生产者产生一个排列后将结果传递给消费者并让出执行权:
def permgen (a, n)
if n == 0 then
Fiber.yield(a)
……
end
第二步,实现一个迭代器工厂,返回一个匿名的迭代函数,迭代函数请求Fiber产生一个新的排列:
def perm(a)
f=Fiber.new do
permgen(a,a.size)
end
return lambda{ f.resume if f.alive? }
end
这样一来我们就可以利用一个while循环来打印全排列:
it=perm([1,2,3,4])
while a=it.call
printResult(a)
end
注意到,在perm方法中有一个很常见的模式,就是将对Fiber的resume封装在一个匿名函数内,在lua为了支持这种模式还特意提供了一个coroutine.wrap方法来方便编程,在Ruby Fiber中却没有,不过我们可以自己实现下,利用open-class的特性实现起来非常简单:
#为Fiber添加wrap方法
class Fiber
def self.wrap
if block_given?
f=Fiber.new do |*args|
yield *args
end
return lambda{|*args| f.resume(*args) if f.alive? }
end
end
end
Fiber#wrap方法跟new方法一样,创建一个新的Fiber,但是返回的是一个匿名函数,这个匿名函数负责去调用fiber的resume,利用wrap改写perm方法变得更简洁:
def perm(a)
Fiber.wrap{ permgen(a,a.size) }
end
但是还不够,while循环的方式还是不够优雅,每次都需要明确地调用迭代器的call方法,这一点让人挺不舒坦,如果能像for...in那样的泛型循环就好了,我们知道Ruby中的for...in其实是一个语法糖衣,都是转变成调用集合的each方法并传入处理的block,因此,要想实现一个优雅的迭代器,我们做下封装就好了:
class FiberIterator
def initialize
@fiber_wrap=Fiber.wrap do
yield
end
end
def each
while value=@fiber_wrap.call
yield value
end
end
end
那么现在的perm方法变成了创建一个迭代器FiberIterator:
def perm(a)
FiberIterator.new{ permgen(a,a.size) }
end
这样一来我们就可以通过for...in来调用迭代器了
it=perm([1,2,3,4])
for a in it
printResult(a)
end
Ruby Fiber指南(一)基础
Ruby Fiber指南(二)参数传递
Ruby Fiber指南(三)过滤器
Ruby Fiber指南(四)迭代器 在学习了Fiber的基础知识之后,可以尝试用Fiber去做一些比较有趣的事情。这一节将讲述如何使用Fiber来实现类似unix系统中的管道功能。在unix系统中,可以通过管道将多个命令组合起来做一些强大的功能,最常用的例如查找所有的java进程:
ps aux|grep java
通过组合ps和grep命令来实现,ps的输出作为grep的输入,如果有更多的命令就形成了一条过滤链。过滤器本质上还是生产者和消费者模型,前一个过滤器产生结果,后一个过滤器消费这个结果并产生新的结果给下一个过滤器消费。因此我们就从最简单的生产者消费者模型实现说起。
我们要展示的这个例子场景是这样:生产者从标准输入读入用户输入并发送给消费者,消费者打印这个输入,整个程序是由消费者驱动的,消费者唤醒生存者去读用户输入,生产者读到输入后让出执行权给消费者去打印,整个过程通过生产者和消费者的协作完成。
生产者发送是通过yield返回用户输入给消费者(还记的上一节吗?):
def send(x)
Fiber.yield(x)
end
而消费者的接收则是通过唤醒生产者去生产:
def receive(prod)
prod.resume
end
生产者是一个Fiber,它的任务就是等待用户输入并发送结果给消费者:
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
消费者负责驱动生产者,并且在接收到结果的时候打印,消费者是root fiber:
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
最终的调用如下:
consumer(producer())
完整的程序如下:
#生产者消费者
require 'fiber'
def send(x)
Fiber.yield(x)
end
def receive(prod)
prod.resume
end
def producer()
Fiber.new do
while true
x=readline.chomp
send(x)
end
end
end
def consumer(producer)
while true
x=receive(producer)
break if x=='quit'
puts x
end
end
if $0==__FILE__
consumer(producer())
end
读者可以尝试在ruby1.9下运行这个程序,每次程序都由消费者驱动生产者去等待用户输入,用户输入任何东西之后回车,生产者开始运行并将读到的结果发送给消费者并让出执行权(通过yield),消费者在接收到yield返回的结果后打印这个结果,因此整个交互过程是一个echo的例子。
最终的调用consumer(producer())已经有过滤器的影子在了,如果我们希望在producer和consumer之间插入其他过程对用户的输入做处理,也就是安插过滤器,那么新的过滤器也将作为fiber存在,新的fiber消费producer的输出,并输出新的结果给消费者,例如我们希望将用户的输入结果加上行号再打印,那么就插入一个称为filter的fiber:
def filter(prod)
return Fiber.new do
line=1
while true
value=receive(prod)
value=sprintf("%5d %s",line,value)
send(value)
line=line.succ
end
end
end
最终组合的调用如下:
consumer(filter(producer()))
类似unix系统那样,简单的加入新的fiber组合起来就可以为打印结果添加行号。
类似consumer(filter(producer()))的调用方式尽管已经很直观,但是我们还是希望能像unix系统那样调用,也就是通过竖线作为管道操作符:
producer | filter | consumer
这样的调用方式更将透明直观,清楚地表明整个过滤器链的运行过程。幸运的是在Ruby中支持对|方法符的重载,因此要实现这样的操作符并非难事,只要对Fiber做一层封装即可,下面给出的代码来自《Programming
ruby》的作者Dave Thomas的
blog:
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def |(other)
other.source = self
other
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
@source.resume
end
def output(value)
Fiber.yield(value)
end
end
这段代码非常巧妙,将Fiber和Ruby的功能展示的淋漓尽致。大致解说下,PipelineElement作为任何一个过滤器的父类,其中封装了一个fiber,这个fiber默认执行process,在process方法中可以看到上面生产者和消费者例子的影子,input类似receive方法调用前置过滤器(source),output则将本过滤器处理的结果作为参数传递给yield并让出执行权,让这个过滤器的调用者(也就是后续过滤器)得到结果并继续处理。PipelineElement实现了“|”方法,用于组合过滤器,将下一个过滤器的前置过滤器设置为本过滤器,并返回下一个过滤器。整个过滤链的驱动者是最后一个过滤器。
有了这个封装,那么上面生产者消费者的例子可以改写为:
class Producer < PipelineElement
def process
while true
value=readline.chomp
handle_value(value)
end
end
end
class Filter < PipelineElement
def initialize
@line=1
super()
end
def handle_value(value)
value=sprintf("%5d %s",@line,value)
output(value)
@line=@line.succ
end
end
class Consumer < PipelineElement
def handle_value(value)
puts value
end
end
现在的调用方式可以跟unix的管道很像了:
producer=Producer.new
filter=Filter.new
consumer=Consumer.new
pipeline = producer | filter | consumer
pipeline.resume
如果你打印pipeline对象,你将看到一条清晰的过滤链,
#<Consumer:0x8f08bf4 @fiber_delegate=#<Fiber:0x8f08a88>, @source=#<Filter:0x8f08db4 @line=1, @fiber_delegate=#<Fiber:0x8f08d60>, @source=#<Producer:0x8f09054 @fiber_delegate=#<Fiber:0x8f09038>>>>
Ruby Fiber指南(一)基础
Ruby Fiber指南(二)参数传递
Ruby Fiber指南(三)过滤器
Ruby Fiber指南(四)迭代器 这一篇其实也算是Fiber编程的基础篇,只不过参数传递算是一个比较重要的主题,因此独立一节。参数传递发生在两个Fiber之间,作为Fiber之间通讯的一个主要手段。
首先,我们可以通过resume调用给Fiber的block传递参数:
1 #resume传递参数给fiber
2 f=Fiber.new do |a,b,c|
3 p a,b,c
4 end
5
6 f.resume(1,2,3)
7
这个例子展示了怎么向fiber的block传递参数,f这个fiber简单地将传入的参数打印出来并终止。
其次,Fiber#yield也可以传递参数给调用resume作为返回结果,猜猜下面的代码打印什么?
1 #yield传递参数给resume
2 f=Fiber.new do |a,b|
3 Fiber.yield a+b,a-b
p a,b
4 end
5
6 p f.resume(10,10)
7 p f.resume(3,4)
8
正确的答案是:
[20, 0]
10
10
[10, 10]
让我们分析下代码的执行过程:
1、第6行第一次调用resume,传入10,10两个参数
2、f开始执行任务,它的任务是调用Fiber#yield,并将参数相加和相减的结果作为参数给yield,也就是执行Fiber.yield 20,10
3、f调用yield之后挂起,返回root fiber,yield的两个参数10、20作为返回结果打印。
4、第7行代码,root fiber再次调用resume并传入参数,f被切入并执行代码p a,b,打印a、b,a和b仍然是上次调用保存的10,而非resume传入的3和4。
5、f执行完毕,返回p a,b的结果作为resume结果,也就是[10,10]
刚才看到上面yield向resume传递参数的例子中第二次调用resume的参数3和4被忽略了,事实上如果还存在一次yield调用,那么3和4将被作为yield的返回结果使用,这就是我们接下来将看到的,通过resume调用传递参数作为fiber中yield的返回结果:
1 #resume传递参数给yield
2 f=Fiber.new do
3 1 + Fiber.yield
4 end
5
6 p f.resume(1)
7 p f.resume(2)
8
这次的打印结果将是:
nil
3
第一次调用resume传入的1将被忽略,因为f的block不需要参数,然后f执行1 + Fiber.yield,在yield的挂起,加法运算没有继续,因为yield的调用没有参数,因此第一次resume返回nil;第二次resume调用传入2,这时候2将作为Fiber#yield的调用结果跟1相加,完成加法运算,得到的结果就是3,这个结果作为fiber的返回值返回给调用者。
总结下上面我们谈到的四种传递参数的情形:通过resume向fiber的block传递参数、通过yield向调用者传递参数、通过resume向yield传递参数、fiber返回值传递给调用者。
Ruby Fiber指南(一)基础
Ruby Fiber指南(二)参数传递
Ruby Fiber指南(三)过滤器
Ruby Fiber指南(四)迭代器 这是一个Ruby Fiber的教程,基本是按照《Programming in lua》中讲述协程章节的顺序来介绍Ruby Fiber的,初步分为5节:基础、参数传递、过滤器、迭代器、应用。这是第一节,介绍下Ruby Fiber的基础知识。
Ruby 1.9引入了Fiber,通常称为纤程,事实上跟传统的coroutine——协程是一个概念,一种非抢占式的多线程模型。所谓非抢占式就是当一个协程运行的时候,你不能在外部终止它,而只能等待这个协程主动(一般是yield)让出执行权给其他协程,通过协作来达到多任务并发的目的。协程的优点在于由于全部都是用户空间内的操作,因此它是非常轻量级的,占用的资源很小,并且context的切换效率也非常高效(可以看看
这个测试),在编程模型上能简化对阻塞操作或者异步调用的使用,使得涉及到此类操作的代码变的非常直观和优雅;缺点在于容错和健壮性上需要做更多工作,如果某个协程阻塞了,可能导致整个系统挂住,无法充分利用多核优势,有一定的学习使用曲线。
上面都是场面话,先看看代码怎么写吧,比如我们写一个打印hello的协程:
1 require 'fiber'
2 f=Fiber.new do
3 p "hello"
4 end
5
6 p f.alive?
7 f.resume
8 p f.alive?
9
10 f.resume
11
附注:这里的代码都在ruby1.9.1-p378测试通过。
第一行先引入fiber库,事实上fiber库并不是必须的,这里是为了调用Fiber#alive?方法才引入。然后通过Fiber#new创建一个Fiber,Fiber#new接受一个block,block里就是这个Fiber将要执行的任务。Fiber#alive?用来判断Fiber是否存活,一个Fiber有三种状态:Created、Running、Terminated,分别表示创建完成、执行、终止,处于Created或者Running状态的时候Fiber#alive?都返回true。启动Fiber是通过Fiber#resume方法,这个Fiber将进入Running状态,打印"hello"并终止。当一个Fiber终止后,如果你再次调用resume将抛出异常,告诉你这个Fiber已经寿终正寝了。因此上面的程序输出是:
0
"hello"
false
fiber1.rb:10:in `resume': dead fiber called (FiberError)
from fiber1.rb:10:in `<main>'
眼尖的已经注意到了,这里alive?返回是0,而不是true,这是1.9.1这个版本的
一个BUG,1.9.2返回的就是true。不过在Ruby里,除了nil和false,其他都是true。
刚才提到,我们为了调用Fiber#alive?而引入了fiber库,Fiber其实是内置于语言的,并不需要引入额外的库,fiber库对Fiber的功能做了增强,具体可以先看看它的
文档,主要是引入了几个方法:Fiber#current返回当前协程,Fiber#alive?判断Fiber是否存活,最重要的是Fiber#transfer方法,这个方法使得Ruby的Fiber支持所谓全对称协程(
symmetric coroutines),默认的resume/yield(yield后面会看到)是半对称的协程(asymmetric coroutines),这两种模型的区别在于“挂起一个正在执行的协同函数”与“使一个被挂起的协同再次执行的函数”是不是同一个。在这里就是Fiber#transfer一个方法做了resume/yield两个方法所做的事情。全对称协程就可以从一个协程切换到任意其他协程,而半对称则要通过调用者来中转。但是Ruby Fiber的调用不能跨线程(thread,注意跟fiber区分),只能在同一个thread内进行切换,看下面代码:
1 f = nil
2 Thread.new do
3 f = Fiber.new{}
4 end.join
5 f.resume
f在线程内创建,在线程外调用,这样的调用在Ruby 1.9里是不允许的,执行的结果将抛出异常
fiber_thread.rb:5:in `resume': fiber called across threads (FiberError)
from fiber_thread.rb:5:in `<main>'
刚才我们仅仅使用了resume,那么yield是干什么的呢?resume是使一个挂起的协程执行,那么yield就是让一个正在执行的Fiber挂起并将执行权交给它的调用者,yield只能在某个Fiber任务内调用,不能在root Fiber调用,程序的主进程就是一个root fiber,如果你在root fiber执行一个Fiber.yield,也将抛出异常:
Fiber.yield
FiberError: can't yield from root fiber
看一个resume结合yield的例子:
1 f=Fiber.new do
2 p 1
3 Fiber.yield
4 p 2
5 Fiber.yield
6 p 3
7 end
8
9 f.resume # =>打印1
10 f.resume # => 打印2
11 f.resume # =>打印3
f是一个Fiber,它的任务就是打印1,2,3,第一次调用resume时,f在打印1之后调用了Fiber.yield,f将让出执行权给它的调用者(这里就是root fiber)并挂起,然后root fiber再次调用f.resume,那么将从上次挂起的地方继续执行——打印2,又调用Fiber.yield再次挂起,最后一次f.resume执行后续的打印任务并终止f。
Fiber#yield跟语言中的yield关键字是不同的,block中的yield也有“让出”的意思,
但是这是在同一个context里,而Fiber#yield让出就切换到另一个context去了,这是完全不同的。block的yield其实是匿名函数的语法糖衣,它是切换context的,跟Fiber不同的是,它不保留上一次调用的context,这个可以通过一个例子来区分:
1 def test
2 yield
3 yield
4 yield
5 end
6 test{x ||= 0; puts x+= 1}
7
这里的test方法接受一个block,三次调用yield让block执行,block里先是初始化x=0,然后每次调用加1,你期望打印什么?
答案是:
1
1
1
这个结果刚好证明了yield是不保留上一次调用的context,每次x都是重新初始化为0并加上1,因此打印的都是1。让我们使用Fiber写同一个例子:
1 fiber=Fiber.new do
2 x||=0
3 puts x+=1
4 Fiber.yield
5 puts x+=1
6 Fiber.yield
7 puts x+=1
8 Fiber.yield
9 end
10
11 fiber.resume
12 fiber.resume
13 fiber.resume
14
执行的结果是:
1
2
3
这次能符合预期地打印1,2,3,说明Fiber的每次挂起都将当前的context保存起来,留待下次resume的时候恢复执行。因此关键字yield是无法实现Fiber的,fiber其实跟continuation相关,在底层fiber跟callcc的实现是一致的(cont.c)。
Fiber#current返回当前执行的fiber,如果你在root fiber中调用Fiber.current返回的就是当前的root fiber,一个小例子:
1 require 'fiber'
2 f=Fiber.new do
3 p Fiber.current
4 end
5
6 p Fiber.current
7 f.resume
这是一次输出:
#<Fiber:0x9bf89f4>
#<Fiber:0x9bf8a2c>
表明root fiber跟f是两个不同的Fiber。
基础的东西基本讲完了,最后看看Fiber#transfer的简单例子,两个协程协作来打印“hello world”:
1 require 'fiber'
2
3 f1=Fiber.new do |other|
4 print "hello"
5 other.transfer
6 end
7
8 f2=Fiber.new do
9 print " world\n"
10 end
11
12 f1.resume(f2)
通过这个例子还可以学到一点,resume可以传递参数,参数将作为Fiber的block的参数,参数传递将是下一节的主题。
最近重读了《Programming Lua》,对协程做了重点复习。众所周知,Ruby1.9引入了Fiber,同样是coroutine,不过Ruby Fiber支持全对称协程(通过fiber库),而Lua只支持所谓半对称协程。
这里将对Lua、LuaJIT和Ruby Fiber的切换效率做个对比测试,测试场景很简单:两个coroutine相互切换达到5000万次,统计每秒切换的次数,各测试多次取最佳。
lua的程序如下:
c1=coroutine.create(function ()
while true do
coroutine.yield()
end
end)
c2=coroutine.create(function ()
while true do
coroutine.yield()
end
end)
local start=os.clock()
local count=50000000
for i=1,count do
coroutine.resume(c1)
coroutine.resume(c2)
end
print(4*count/(os.clock()-start))
考虑到在循环中事实上发生了四次切换:main->c1,c1->main,main->c2,c2->main,因此乘以4。
Ruby Fiber的测试分两种,采用transfer的例程如下:
require 'fiber'
require 'benchmark'
Benchmark.bm do |x|
MAX_COUNT=50000000
f1=Fiber.new do |other,count|
while count<MAX_COUNT
other,count=other.transfer(Fiber.current,count.succ)
end
end
f2=Fiber.new do |other,count|
while count<MAX_COUNT
other,count=other.transfer(Fiber.current,count.succ)
end
end
x.report{
f1.resume(f2,0)
}
end
Ruby Fiber采用resume/yield的例程如下:
require 'benchmark'
f1=Fiber.new do
while true
Fiber.yield
end
end
f2=Fiber.new do
while true
Fiber.yield
end
end
COUNT=50000000
Benchmark.bm do |x|
x.report{
COUNT.times do
f1.resume
f2.resume
end
}
end
测试环境:
CPU : Intel(R) Core(TM)2 Duo CPU P8600 @ 2.40GHz
Memory: 3GB
System : Linux dennis-laptop 2.6.31-14-generic #48-Ubuntu SMP
Lua : 5.1.4
ruby : 1.9.1p378
LuaJIT: 1.1.5和2.0.0-beta2
测试结果如下:
| | Lua | LuaJIT 1.1.5
| LuaJIT 2.0.0-beta2
| ruby-transfer
| ruby-resume/yield |
| 次数 | 6123698 | 9354536 | 24875620 | 825491 | 969649 |
结论:
1、lua的协程切换效率都是百万级别,luaJIT 2.0的性能更是牛叉,切换效率是原生lua的4倍,达到千万级别。
2、相形之下,Ruby Fiber的效率比较差了,十万级别。
3、Ruby使用transfer的效率比之resume/yield略差那么一点,排除一些测试误差,两者应该是差不多的,从ruby源码上看resume/yield和transfer的调用是一样的,resume还多了几条指令。
4、额外信息,Ruby Fiber和lua coroutine都只能跑在一个cpu上,这个测试肯定能跑满一个cpu,内存占用上,lua也比ruby小很多。
题外:从老家从早到晚总算折腾回了杭州,进站太早,火车晚点,提包带断,什么倒霉事也遇上了,先发个已经整理好的部分,后续仍待整理。
多道程序设计:分离进程为独立的功能
无论在协作进程还是在同一进程的协作子过程层面上,Unix设计风格都运用“做单件事并做好的方法“,强调用定义良好的进程间通信或共享文件来连通小型进程,提倡将程序分解为更简单的子进程,并专注考虑这些子进程间的接口,这至少需要通过以下三种方法来实现:
1、降低进程生成的开销(思考下Erlang的process)
2、提供方法(shellout、IO重定向、管道、消息传递和socket)简化进程间通信
3、提倡使用能由管道和socket传递的简单、透明的文本数据格式
Unix IPC方法的分类:
1、将任务转给专门程序,如system(3),popen调用等,称为shellout
2、Pipe、重定向和过滤器,如bc和dc
3、包装器,隐藏shell管线的复杂细节。
4、安全包装器和Bernstein链
5、主/从进程
6、对等进程间通信:
(1)临时文件
(2)信号
(3)系统守护程序和常规信号
(4)socket
(5)共享内存,mmap
远程过程调用(RPC)的缺憾:
1、RPC接口很难做到可显,如果不编写和被监控程序同样复杂的专用工具,也难以监控程序的行为。RPC接口和库一样具有版本不兼容的问题,由于是分布式的,因此更难被追查。
2、类型标记越丰富的接口往往越复杂,因而越脆弱。随着时间的推移,由于在接口之间传递的类型总量逐渐变大,单个类型越来越复杂,这些接口往往产生类型本体蠕变问题。这是因为接口比字符串更容易失配;如果两端程序的本体不能正确匹配,要让它们通信肯定很难,纠错更是难上加难。
3、支持RPC的常见理由是它比文本流方法允许“更丰富”的接口,但是接口的功能之一就是充当阻隔点,防止模块的实现细节彼此泄漏,因此,支持RPC的主要理由同时恰恰证明了RPC增加而不是降低了程序的全局复杂度。
Unix传统强烈赞成透明、可显的接口,这是unix文化不断坚持文本协议IPC的动力。
ESR在这里还谈到XML-RPC和SOAP等协议,认为是RPC和unix对文本流支持的一种融合,遗憾的是SOAP本身也成为一种重量级、不那么透明的协议了,尽管它也是文本协议。
线程是有害的:
线程是那些进程生成昂贵、IPC功能薄弱的操作系统所特有的概念。
尽管线程通常具有独立的局部变量栈,它们却共享同一个全局内存,在这个共享地址空间管理竞争和临界区的任务相当困难,而且成为增加整体复杂度和滋生bug的温床。除了普通的竞争问题之外,还产生了一类新问题:时序依赖。
当工具的作用不是控制而是增加复杂度的时候,最好扔掉从零开始。
微型语言:寻找歌唱的乐符
(注,这里谈的微型语言,就是现在比较热门的词汇DSL)
对软件错误模式进行的大量研究得出的一个最一致的结论是,程序员每百行程序出错率和所使用的编程语言在很大程度上是无关的。更高级的语言可以用更少的行数完成更多的任务,也意味着更少的bug。
防御设计不良微型语言的唯一方法是知道如何设计一个好的微型语言。
语言分类法:

对微型语言的功能测试:不读手册可以编写吗?
现代微型语言,要么就完全通用而不紧凑,要么就非常不通用而紧凑;不通用也不紧凑的语言则完全没有竞争力。
一些引申想法:我认为这个评判标准也可以用在任何编程语言上,以此来判断一些语言,C语言既通用又紧凑,Java是通用而不紧凑,ruby、Python之类的脚本语言也是如此,正则表达式(如果也算语言的话)是不通用而紧凑,Erlang也是通用而紧凑,awk却是既不通用也不紧凑,XSLT也可以归入不通用不紧凑的行列;Javascript是个另类,按理说它也是不通用不紧凑,说它不通用是因为它的主要应用范围还是局限在web开发的UI上,实际上Javascript也是门通用语言,但是很少会有人会用javascript去写批处理脚本,Javascript显然是不紧凑的,太多的边边角角甚至奇奇怪怪的东西需要你去注意,然而就是这样一门不通用不紧凑的语言现在却非常有前途,只能说时势所然。
设计微型语言:
1、选择正确的复杂度,要的是数据文件格式,还是微型语言?
2、扩展和嵌入语言
3、编写自定义语法,yacc和lex
4、慎用宏,宏的主要问题是滥用带来的奇怪、不透明的代码,以及对错误诊断的扰乱。
5、语言还是应用协议。
模块性:保持清晰,保持简洁
软件设计有两种方式:一种是设计得极为简洁,没有看得到的缺陷;另一种是设计得极为复杂,有缺陷也看不出来。第一种方式的难度要大得多。
模块化代码的首要特质就是封装,API在模块间扮演双重角色,实现层面作为模块之间的滞塞点,阻止各自的内部细节被相邻模块知晓;在设计层面,正是API真正定义了整个体系。
养成在编码前为API编写一段非正式书面描述的习惯,是一个非常好的方法。
模块的最佳大小,逻辑行200-400行,物理行在400-800之间。
紧凑性就是一个设计能否装入人脑中的特性。测试软件紧凑性的一个简单方法是:一个有经验的用户通常需要用户手册吗?如果不需要,那么这个设计是紧凑的。
理解紧凑性可以从它的“反面”来理解,紧凑性不等于“薄弱”,如果一个设计构建在易于理解且利于组合的抽象概念上,则
这个系统能在具有非常强大、灵活的功能的同时保持紧凑。紧凑也不等同于“容易学习”:对于某些紧凑
设计而言,在掌握其精妙的内在基础概念模型之前,要理解这个设计相当困难;但一旦理解了这个概念模型,整个视角就会改变,紧凑的奥妙也就十分简单了。紧凑也不意味着“小巧”。即使一个设计良好的系统,对有经验的用户来说没什么特异之处、“一眼”就能看懂,但仍然可能包含很多部分。
评测一个API紧凑性的经验法则是:API的入口点通常在7个左右,或者按《代码大全2》的说法,7+2和7-2的范围内。
重构技术中的很多坏味道,特别是重复代码,是违反正交性的明显例子,“重构的原则性目标就是提高正交性”。
DRY原则,或者称为SPOT原则(single
point of
truth)——真理的单点性。重复的不仅仅是代码,还包括数据结构,数据结构模型应该最小化,提倡寻找一种数据结构,使得模型中的状态跟真实世界系统的状态能够一一对应。
要提高设计的紧凑性,有一个精妙但强大的方法,就是围绕“解决一个定义明确的问题”的强核心算法组织设计,避免臆断和捏造,将任务的核心形式化,建立明确的模型。
文本化:好协议产生好实践
文本流是非常有用的通用格式,无需专门工具就可以很容易地读写和编辑文本流,这些格式是透明的。如果担心性能问题,就在协议层面之上或之下压缩文本协议流,最终产生的设计会比二进制协议更干净,性能可能更好。使用二进制协议的唯一正当理由是:如果要处理大批量的数据,因而确实关注能否在介质上获得最大位密度,或者是非常关心将数据转化为芯片核心结构所必须的时间或指令开销。
数据文件元格式:
1、DSV风格,DElimiter-Seperated
Values
使用分隔符来分隔值,例如/etc/passwd
适合场景:数据为列表,名称(首个字段)为关键字,而且记录通常很短(小于80个字符)
2、RFC
822格式
互联网电子邮件信息采用的文本格式,使用属性名+冒号+值的形式,记录属性每行存放一个,如HTTP
1.1协议。
适合场景:任何带属性的或者与电子邮件类似的信息,非常适合具有不同字段集合而字段中数据层次又扁平的记录。
3、Cookie-jar格式。简单使用跟随%%的新行符(或者有时只有一个%)作为记录分隔符,很适用于记录非结构化文本的情况。
适合场景:词以上结构没有自然顺序,而且结构不易区别的文本段,或适用于搜索关键字而不是文本上下文的文本段。
4、Record-jar格式,cookie-jar和RFC-822的结合,形如
name:dennis
age:21
%%
name:catty
age:22
%%
name:green
age:10
这样的格式。
适合场景:那些类似DSV文件,但又有可变字段数据而且可能伴随无结构文本的字段属性关系集合。
5、XML格式,适合复杂递归和嵌套数据结构的格式,并且经常可以在无需知道数据语义的情况下仅通过语法检查就能发现形式不良损坏或错误生成的数据。缺点在于无法跟传统unix工具协作。
6、Windows
INI格式,形如
[DEFAULT]
account=esr
[python]
directory=/home/ers/cvs/python
developer=1
[sng]
directory=/home/esr/WWW/sng
numeric_id=1001
developer=1
[fetchmail]
numeric_id=4928492
这样的格式
适合场景:数据围绕指定的记录或部分能够自然分成“名称-属性对”两层组织结构。
Unix文本文件格式的约定:
1、如果可能,以新行符结束的每一行只存一个记录
2、如果可能,每行不超过80个字符
3、使用”#“引入注释
4、支持反斜杠约定
5、在每行一条记录的格式中,使用冒号或连续的空白作为字段分隔符。
6、不要过分区分tab和whitespace
7、优先使用十六进制而不是八进制。
8、对于复杂的记录,使用“节(stanza)”格式,要么让记录格式和RFC
822电子邮件头类似,用冒号终止的字段名关键字作为引导字段。
9、在节格式中,支持连续行,多个逻辑行折叠成一个物理行
10、要么包含一个版本号,要么将格式设计成相互独立的的自描述字节块。
11、注意浮点数取整。
12、不要仅对文件的一部分进行压缩或者二进制编码。
应用协议元格式
1、经典的互联网应用元协议
RFC
3117《论应用协议的设计》,如SMTP、POP3、IMAP等协议
2、作为通用应用协议的HTTP,在HTTP上构建专用协议,如互联网打印协议(IPP)
3、BEEP:块可扩展交换协议,既支持C/S模式,又支持P2P模式
4、XMP-RPC、SOAP和Jabber,基于XML的协议。
透明性:来点光
美在计算科学中的地位,要比在其他任何技术中的地位都重要,因为软件是太复杂了。美是抵御复杂的终极武器。
如果没有阴暗的角落和隐藏的深度,软件系统就是透明的。透明性是一种被动品质。如果实际上能预测到程序行为的全部或大部分情况,并能建立简单的心理模型,这个程序就是透明的,因为可以看透机器究竟在干什么。
如果软件系统所包含的功能是为了帮助人们对软件建立正确的“做什么、怎样做”的心理模型而设计,这个软件系统就是可显的。
不要让调试工具仅仅成为一种事后追加或者用过就束之高阁的东西。它们是通往代码的窗口:不要只在墙上凿出粗糙的洞,要修整这些洞并装上窗。如果打算让代码一直可被维护,就始终必须让光照进去。例如fetchmail的-v选项将处理SMTP、POP的处理过程打印到标准输出,使得fetchmail行为具有可显性。
在“这个设计能行吗?”之后要提出的头几个问题就是“别人能读懂这个设计吗?这个设计优雅吗?”我们希望,此时大家已经很清楚,这些问题不是废话,优雅不是一种奢侈。在人类对软件的反映中,这些品质对于减少软件bug和提高软件长期维护性是最基本的。
要追求代码的透明性,最有效的方法是很简单,就是不要在具体操作的代码上叠放太多的抽象层。
OO语言使得抽象变得容易——也许是太容易了。OO语言鼓励“具有厚重的胶合和复杂层次”的体系。当问题领域真的很复杂,确实需要大量抽象时,这可能是好事,但如果coder到头来用复杂的办法做简单的事情——仅仅是为他们能够这样做,结果便适得其反。
所有的OO语言都显示出某种使程序员陷入过度分层陷阱的倾向。对象框架和对象浏览器并不能代替良好的设计和文档,但却常常被混为一谈。过多的层次破坏了透明性:我们很难看清这些层次,无法在头脑中理清代码到底是怎样运行的。简洁、清晰和透明原则通通被破坏了,结果代码中充满了晦涩的bug,始终存在维护问题。
胶合层中的“智能数据”却经常不代表任何程序处理的自然实体——仅仅只是胶合物而已(典型现象就是抽象类和混入(mixin)类的不断扩散)
OO抽象的另一个副作用就是程序往往丧失了优化的机会。
OO在其取得成功的领域(GUI、仿真和图形)之所以能成功,主要原因之一可能是因为在这些领域里很难弄错类型的本体问题。例如,在GUI和图形系统中,类和可操作的可见对象之间有相当自然的映射关系。
Unix风格程序设计所面临的主要挑战就是如何将分离法的优点(将问题从原始的场景中简化、归纳)同代码和设计的薄胶合、浅平透层次结构的优点相组合。
太多的OO设计就像是意大利空心粉一样,把“is-a”和“have
a”的关系弄得一团糟,或者以厚胶合层为特征,在这个胶合层中,许多对象的存在似乎只不过是为了在陡峭的抽象金字塔上占个位置罢了。这些设计都不透明,它们晦涩难懂并且难以调试。
为透明性和可显性而编码:
1、程序调用层次中(不考虑递归)最大的静态深度是多少?如果大于四,就要当心。
2、代码是否具有强大、明显的不变性质(约束)?不变性质帮助人们推演代码和发现有问题的情况。
3、每个API中各个函数调用是否正交?或者是否存在太多的magic
flags或者模式位?
4、是否存在一些顺手可用的关键数据结构或全局唯一的记录器,捕获系统的高层次状态?这个状态是否容易被形象化和检验,还是分布在数目众多的各个全局变量或对象中而难以找到?
5、程序的数据结构或分类和它们所代表的外部实体之间,是否存在清晰的一对一映射。
6、是否容易找到给定函数的代码部分?不仅单个函数、模块,还有整个代码,需要花多少精力才能读懂?
7、代码增加了特殊情况还是避免了特殊情况?每一个特殊情况可能对任何其他特殊情况产生影响:所有隐含的冲突都是bug滋生的温床。然而更重要的是,特殊情况使得代码更难理解。
8、代码中有多少个magic
number?通过审查是否很容易查出代码中的限制(比如关键缓冲区的大小)?
隐藏细节和无法访问细节有着重要区别。不要过度保护。
无论何时碰到涉及编辑某类复杂二进制对象的设计问题时,unix传统都提倡首先考虑,是否能够编写一个能够在可编辑的文本格式和二进制格式之间来回进行无损转换的工具?这类工具可称为文本化器(textualizer).
宁愿抛弃、重建代码也不愿修补那些蹩脚的代码。
“代码是活代码、睡代码还是死代码?”活代码周围存在一个非常活跃的开发社团。睡代码之所以“睡着”,经常是因为对作者而言,维护代码的痛苦超过了代码本身的效用。死代码则是睡得太久,重新实现一段等价代码更容易。
Unix哲学是自下而上,而不是自上而下的,注重失效,立足于丰富的经验,你不会在正规方法学和标准中找到它。
Unix管道的发明人Doug
McIlroy曾经说过:
1、让每个程序就做好一件事,如果有新任务就重新开始,不要往新程序中加入功能而搞的复杂。
2、假定每个程序的输出都会成为另一个程序的输入,哪怕那个程序是未知的。输出中不要有无关的信息干扰,避免使用严格的分栏格式和二进制格式输入。不要坚持使用交互式输入。
3、尽可能早将设计和编译的软件投入试用,哪怕是操作系统也不例外,理想情况下应该是几星期内,对抽劣的代码别犹豫,扔掉重写。
4、优先使用工具,而非拙劣的帮助来减轻编程任务的负担,工欲善其事,必先利其器。
Rob
Pike在《Notes on C
programming》中提到:
原则1:你无法断行程序会在什么地方耗费运行时间。瓶颈经常出现在想不到的地方,所以别急于胡乱找个地方改代码,除非你已经证实那儿就是瓶颈所在。
原则2:估量。在你没对代码进行估量,特别是没找到最耗时的那部分之前,别去优化速度。
原则3:花哨的算法,在n很小的适合通常很慢,而n通常很小。花哨算法的常数复杂度很大,除非你确定n一直很大,否则不要用花哨算法(即使n很大,也要优先考虑原则2)。
原则4:花哨的算法比简单的算法更容易出bug
,更难实现。尽量使用简单的算法配合简单的数据结构。
原则5:数据压倒一切。如果已经选择了正确的数据结构并且把一切组织得井井有条,正确的算法也就不言自明,编程的核心是数据结构,而不是算法。
原则6:没有原则6.
Ken
Thompson对原则4做了强调:
拿不准就穷举。
Unix哲学的17条原则:
1、模块原则:简洁的接口拼合简单的部件。
2、清晰原则:清晰胜于机巧。
3、组合原则:设计时考虑拼接组合。
4、分离原则:策略同机制分离,接口同引擎分离。
5、简洁原则:设计要简洁,复杂度能低则低。
6、吝啬原则:除非却无他法,不要编写庞大的程序。
7、透明性原则:设计要可见,以便审查和调试。
8、健壮原则:健壮源于透明与简洁。
9、表示原则:把知识叠入数据,以求逻辑质朴而健壮。
10、通俗原则:接口设计避免标新立异。
11、缄默原则:如果一个程序没什么好说的,就沉默。
12、补救原则:出现异常时,马上退出,并给出足够错误信息。
13、经济原则:宁花机器一分,不花程序员一秒。
14、生成原则:避免手工hack,尽量编写程序去生成程序。
15、优化原则:雕琢前先要有原型,跑之前先学会走。
16、多样原则:绝不相信所谓“不二法门”的断言。
17、扩展原则:设计着眼未来,未来总是比预想来得快。
Unix哲学之一言以蔽之:KISS
Keep
it simple,stupid!
应用unix哲学:
1、只要可行,一切都应该做成与来源和目标无关的过滤器。
2、数据流应尽可能的文本化(这样可以用标准工具来查看和过滤)。
3、数据库部署和应用协议应尽可能文本化(让人阅读和编辑)。
4、复杂的前端(用户界面)和后端应该泾渭分明。
5、如果可能,用c编写前,先用解释性语言搭建原型。
6、当且仅当只用一门编程语言会提高程序复杂度时,混用语言编程才比单一语言编程来得好。
7、宽收严发(对接收的东西要包容,对输出的东西要严格)
8、过滤时,不需要丢弃的消息绝不丢。
9、小就是美。在确保完成任务的基础上,程序功能尽可能的少。
最后强调的是态度:
要良好地运用unix哲学,你就应该不断地追求卓越,你必须相信,程序设计是一门技艺,值得你付出所有的智慧、创造力和激情。否则,你的视线就不会超越那些简单、老套的设计和实现;你就会在应该思考的时候急急忙忙跑去编程。你就会在该无情删繁就简的时候反而把问题复杂化——然后你还会反过来奇怪你的代码怎么会那么臃肿,那么难以调试。
要良好地运用unix哲学,你应该珍惜你的时间绝不浪费。一旦某人已经解决了某个问题,就直接拿来利用,不要让骄傲或偏见拽住你又去重做一遍。永远不要蛮干;要多用巧劲,省下力气在需要的时候用,好钢用到刀刃上。善用工具,尽可能将一切自动化。
软件设计和实现是一门充满快乐的艺术,一种高水平的游戏。如果这种态度对你来说听起来有些荒谬,或者令你隐约感到有些困窘,那么请停下来,想一想,问问自己是不是已经把什么给遗忘了。如果只是为了赚钱或者打发时间,你为什么要搞软件设计,而不是别的什么呢?你肯定曾经也认为软件设计值得你付出激情……
要良好地运用unix哲学,你需要具备(或者找回)这种态度。你需要用心。你需要去游戏。你需要乐于探索。
操作系统的风格元素:
1、什么是操作系统的统一性理念
2、多任务能力
3、协作进程(IPC)
4、内部边界
5、文件属性和记录结构
6、二进制文件格式
7、首选用户界面风格
8、目标受众
9、开发的门坎
经常在china-pub上买书,我的账号早已经到五星,再加上china-pub上很多新书首发,因此尽管当当网有时候更便宜,还是经常在china-pub买。不过这次我要出离愤怒了,同样是上个月29号下的单,当当在周一就送到了,而china-pub到今天5号竟然还没有送到,看订单信息是货已出库,并且发货时间在1月31号,从北京到杭州走了5天竟然还没到,选的什么快递公司。
不到也还罢了,更可恶的是售后服务,我在china-pub的客服论坛发帖,他们自己承诺工作时间内60分钟回复,回复个P啊,从昨天到今天没见一个人点击我的帖子,更何谈回复。OK,论坛不行,那么我发邮件吧,从china-pub的
客服服务页的客服email进去,填写表单,OK,提交失败?为什么,没有验证码,可是你TMD根本没显示验证码啊,这是什么狗屁程序,见下图
你看到验证码在哪里吗?刷新N遍愣是没出来,多牛B的客服email啊。
线上不行,那么我打电话可以吧,这电话是普通长途也还罢了,我自己掏钱没事,可总得有人接吧,事实是我早上打了3个电话,两个查询订单,等了N久没人接,靠,那我投诉吧,转投诉,一样没人接,我只能说china-pub你们真牛气,你们的客服是摆设不成?你们这么牛气,我也不敢买了,惹不起我还躲不起啊。
怎么让你对象跟Array或者Hash一样,可以使用[ ]操作符来获取属性值或者赋值? 问题其实就是如何定义
index操作符,在Ruby中可以这样做:
class Message
def initialize
@props=Hash.new
end
def [](key)
@props[key]
end
def []=(key,value)
@props[key]=value
end
end
m=Message.new
m[0]=1
p m[0]
m[:a]="hello"
p m[:a]
注意方法签名。
通讯层的改造使用了
google protocol buffers作为协议体,效率还是挺让人满意。编辑以.proto结尾的语法文件,没有语法高亮很不习惯,幸好protocolbuf提供了vim和emacs的扩展。下载非win32版本的protocol buffers的压缩包里,解压后有个editors目录,里面就是两个扩展文件:proto.vim是提供给vim爱好者的,而
protobuf-mode.el就是提供给emacs控的。
安装很简单,将protobuf-mode.el加入你的Emacs加载路径,然后在.emacs配置文件里加上这么两行代码:
(require 'protobuf-mode)
(setq auto-mode-alist (cons '(".proto$" . protobuf-mode) auto-mode-alist))
require是不够的,第二行将自动把.proto结尾的打开文件以protobuf-mode模式运行。运行时截图:
工具栏上多了个ProtocolBuffers菜单,有一些简单功能,如注释某段代码,代码跳转等等。
Java
Memcached Client——
Xmemcached的新版本
1.2.2正式released。这个小版本最主要的改进是允许遍历所有在memcached中的key,这是通过stats协议实现,具体信息可以看
这里。
1.2.2的主要改进如下:
1、添加一个
KeyIterator接口,这个迭代器接口用于遍历memcached中的所有key。由于是基于stats协议实现的,因此这个迭代过程
并非高效,请
慎重使用,并且迭代返回的key也并非实时,而是
当前快照。KeyIterator目前
仅在文本协议下可用,使用例子如下:
MemcachedClient client=
KeyIterator it=client.getKeyIterator(AddrUtil.getOneAddress("localhost:11211"));
while(it.hasNext())
{
String key=it.next();
}
2、添加一个新类
net.rubyeye.xmemcached.Counter,用于封装原始的incr/decr方法,提供类似AtomicLong原子类的API方便计数器的使用:
Counter counter=client.getCounter("counter",0);
counter.incrementAndGet();
counter.decrementAndGet();
counter.addAndGet(-10);
3、修复BUG,如
issue 71,issue 72,issue 70 etc.
4、声明废弃
net.rubyeye.xmemcached.buffer.BufferAllocator,现在哪怕你设置了这一属性也将被忽略,这个类将在以后的某个版本中移除
5、升级
yanf4j到1.1.0
Wiki和
用户指南都已经更新,欢迎使用并反馈任何建议或者bug报告。
项目主页:
http://code.google.com/p/xmemcached/
下载地址:
http://code.google.com/p/xmemcached/downloads/list
利用这个
小工具,可以生成豆瓣上记录的每年读的书,看的电影,听的音乐。我的记录,看过的书
看过的电影:
《Joel on software》谈到所谓抽象漏洞,简单来说就是抽象能解决90%的一般情况,而其他10%的情况你仍然需要跟抽象层面下的细节打交道,也就是抽象本身只能减少你的工作时间,而无法减少你的学习时间。道理简单,举几个例子。
以SQL语言为例,SQL是所谓说明性的语言,你所写的语句只是一条what,而how是如何做的无需关心,但是真的无需关心吗?事实上是不行,低效的SQL语句对数据库的性能损害非常大,作为程序员你需要知道SQL这个抽象层次下的部分内容,知道数据库是怎么执行这些语句,知道如何去避免一些最差实践。再比如分布式调用希望做到能跟本地调用一样的透明,但实际上还是不行的,网络的不确定性让RPC调用根本无法做到的类似本地调用那样的透明性。隐藏在RPC这个抽象层次下的网络通信细节,你不能不去care。抽象能帮你解决大多数情况,提高你的工作效率,但是剩下的一公里问题,仍然需要你花费更多时间和精力去了解并解决。这事实上也是一个优秀程序员跟普通程序的差别之一,学习了java编程,知道了collection集合框架,不代表你无需再去学习数据结构和算法。
Joel将这个现象称为漏洞抽象。事实上,我并不认为这是抽象本身的漏洞,这反而是软件的本质复杂性在作怪,抽象只能去化简偶然复杂性,例如函数、类、模块化等手段去组织代码,而本质的复杂性是无法避免的。举个不是那么恰当的例子,例如我们有这么个方法,传进一个参数list,我们要遍历list做一些事情,(我知道用迭代器才是正途,先允许我犯这么个错误),你可能这么写:
public void doSomething(List<String> list){
for(int i=0;i<list.size();i++){
String str=list.get(i);
//do something
}
}
这样的代码我估计在1.5有for语句增强之前不少人都写过,这样的代码有什么问题呢?考虑下list是ArrayList和LinkedList这两种情况,List是链表的抽象,但是链表的实现形式却是可以用数组或者引用链接,链表的实现形式不同,List.get(index)这个方法的效率会很成问题。我们都知道ArrayList适宜于随机访问,而LinkedList方便插入添加移除,在这个doSomething方法中,显然随机访问的诉求大于添加移除。在通常情况下,这样写都不会成为问题,但是如果这个doSomething方法被经常调用,并且list是一个LinkedList的情况下,这个方法就很可能成为性能瓶颈。我们寄希望于List这个接口可以让我们无需关心list的具体实现,然而现实是你仍然需要知道各种实现的区别和原理,这就是所谓漏洞抽象。这并非抽象的无力,你肯定不会反对“针对接口编程”这条原则,而是抽象本身解决不了本质复杂性,这里的本质复杂性就是链表的实现方法,随机访问与添加移除的平衡问题。在我们无法找到更好的链表实现方法来平衡随机访问与添加移除之前,这个本质复杂性就不是抽象能够解决的。
同样的现象出现在String、StringBuffer、StringBuilder的使用上,字符串的实现方法你仍然需要知道,这是绕不过去的本质复杂性。这里谈到的本质复杂性根本上也是现实世界的本质复杂性的反映,扯远些就更虚了。就现实的工作情况来看,不知道其他人有没有这样的经验,就是在自以为解决某个难题的时候,最后却发现难题以另一个面目出现,问题本身没有得到解决,只是以更好的方式被掩盖了。
无论是过程式、OO、函数式编程,解决的问题都是为了更好的抽象,抽象是个好东西,但是抽象无法解决那些本质性的问题,因此《人月神话》断言没有银弹,我们仍然需要跟狼人作战。
你不得不承认,写代码的效率跟周期性的情绪相关。以我为例,总存在着周期性的情绪波动,那段时间内基本不想写代码,上班就是收收邮件,看看网页,遗憾的是每个月都有那么几天。事实上,我认为在一天8小时的工作中,能有2、3个小时能达到忘我状态的工作,那已经是非常不错的事情。如果你是程序员,你肯定知道我说的忘我状态是什么。我在这里说的局限了,其实任何工作都可能进入这种忘我状态,这种状态下你的思维非常活跃,全神贯注,哪怕有人跟你说话你也会听而不闻,这种状态在你读一本非常有趣的小说的时候也会出现。这种状态下的你效率会非常高,例如我前段时间内就在一周内写了13000多行代码,600多个测试用例,为我们的系统重新实现了一个通信层。
看过很多讨论程序员工作效率的文章,据称研究表明要进入这种状态是至少要15分钟的时间,因此频繁地打断工作会阻碍你的工作效率,毕竟酝酿情绪也是需要时间的嘛。我有思考过怎么去尽量多地保持这种状态,排除那种对工作厌烦的情绪,毕竟拿着工资不干活心里还是会不安,况且看到周围那么多高效率的人,压力是难免的,让人担心的不是每天只有两个小时的高效工作,而是那段什么都不想干的时间。最后让我发现一个方法,说起来很简单,就是在出现这种低效状态的时候,强迫自己打开eclipse,而不是 firefox,强迫自己去写几行代码,如果这段时间内没有被其他事情打断,那么你还是容易进入一种不那么高效和愉悦的工作状态,至少能做到专心致志。当然,跟自己的情绪对抗可能不是世界上最困难的事情,也是其中之一,不过请你相信,只要你打开eclipse开始写代码并进入思考状态,那么你至少是可以暂时遗忘那些负面情绪的,甚至你的情绪可能因为解决了某个难题而高昂起来。
这个方法肯定不是什么新发现,我估计很多人会有同样的经验,今早在看《joel说软件》其中一篇文章《开火与运动》也谈到了同样的问题,joel也提到相同的经验:开了头就好。你不知道要费多少劲才能将一辆带齿轮的山地车运转起来,不过一旦它转起来之后,一切都跟骑一辆没有齿轮的自行车没什么两样。Joel还延伸了更多,开火的策略不仅仅是工作效率的问题,也是竞争策略,当你向敌人开火的时候,同时向敌人靠过去,活力会迫使敌人低下头而不能向你开火。竞争也是如此,压迫性的不断推出新东西让你的竞争对手疲于奔命,反而遗忘产品的根本性的目的,这些新东西可能只是为了替换过去不易用的东西,为什么不易用的东西在过去也会被推出来?那只是了为每天进步不断开火,让敌人忘记开火。
趁着下班前的半小时,回顾下2009年我都干了什么,有什么收获,有什么不足。
09年最重要的事情是我的儿子出生了,小家伙的到来带给全家很多欢乐,烦恼也不少,比如半夜总要被吵醒,晚上的读书也没办法那么专心读了。此外,我还在学习怎么当爸爸,写过这篇《
新爸爸指南》,记录下新生儿遇到的种种问题,逐渐经历自己生命的又一个阶段,这个历程很美好。
年初从广州公司辞职后,到了厦门一家创业公司,这不是一次很愉快的经历,回想起来我的问题不小。首先不该贸然地想去转换一个语言平台,写C++实在不是很好的编程的体验,乃至于我根本提不起工作热情;其次,心态不成熟,遇到问题和困扰的时候还是比较被动地解决,事实上完全没必要搞成这样,主动提出并且离开公司并不是什么丢人的事情。这次经历告诉我做决定的时候最好再慎重一点,毕竟自己不是一个人了,养家糊口是实实在在的责任。
在厦门的失败经历后,我投简历到了淘宝,尽管对于待遇并不是很满意,出于对淘宝的向往和有点理想主义的小情怀还是来到了杭州。刚来的时候,工作很顺利,生活比较糟糕,老婆孩子接到杭州后才好了点,生活比较有规律了。在淘宝,我所做的仍然是开发,写代码还是我的最爱,不过做的离业务的比较远,这正符合我的期望。负责的是一个消息中间件的开发,这个产品本身已经成型,并且应用在了淘宝的核心系统当中,现在每天通过这个MQ发送的消息量已经接近两亿,整个系统拥有数个集群,近30台机器。工作不单纯是开发,包括一些方案的设计和日常的维护工作,总体来讲还是很愉悦的体验。不足的地方,我仍然还是将自己视为一个纯粹的技术人员,对淘宝本身的业务、对其他系统的架构设计的了解都比较少,甚至于认识的人还是很局限,不过这个跟我的性格有关了。
技术上,这一年自我感觉没多大进步,除了将
sicp读完之外(我准备再度几遍),一些零零散散的技术书籍也看了不少,很少留下深刻的印象,比较有价值的是《
卓有成效的程序员》和《
C++网络编程》上下两卷。前者使我开始有意识地将自己一些重复性的工作自动化,提高自己的工作效率,后者让我对网络框架的设计模式有了相对全面的认识,也促进了我对Java网络编程的认识。今年也开发了个Java Memcached Client——
Xmemcached,并在大家的鼓励下持续地在改进,总算有不少用户在用,没有枉费精力和时间,也算今年的一个小小自得的地方。这里要特别感谢下曹晓刚,没有他的鼓励和他们公司的使用,
xmc还只是个人玩具。09年下半年又将不少精力放在了Erlang,过去学习是跟风,这次总算在项目中了有了个小应用,并且将《Erlang程序设计》和OTP设计原则来回读了几遍,对Erlang的兴趣越来越大,甚至于想是不是该去找份专职做Erlang的工作。对技术的学习,我还是没有一个明确的规划,任凭兴趣在几个领域里转来转去,这不是好现象,明年希望能更有计划和针对性地去学习,能跟自己的工作契合得更紧密一些。明年也希望能将《算法导论》读完,今年读了1/4,发现我的数学都已经抛到了Java国了,算法复杂度的推导总是看不懂,因此又去搞了几本数学书,从头再看看。
回顾完了,说说明年的愿望:
技术上:读完《算法导论》,继续深入Erlang,探索Erlang在工作中的实际应用,加强对其他系统的了解以及大型网站构建方面的学习
生活上:希望能全家一起去旅游一次,希望能将老爸老妈接过来玩一段时间。
昨天收到一个xmc的issue报告,大概的意思是将
Xmemcached与spring 2.5集成没有任何问题,但是将spring升级到3.0就会抛出一个异常,并且spring容器无法正常启动,异常信息类似“
Couldn't find a destroy method named 'shutdown' on bean XMemcachedClientFactoryBean”。更详细的情况可以看这里,这是这位朋友分析的结果,简单来说就是spring 3.0对于查找destroy method为空的情况处理不同了,过去是打个日志,现在是抛出一个异常。
问题说完,这里主要是介绍下这个问题的解决方式,事实上Xmemcached有一个没有被文档化的Spring配置方式,没有写入文档的主要考虑是以为wiki介绍的第一种方式已经足够,而builder的方式相对繁琐一些。通过XmemcachedClientBuilder的这个factory bean的factory-method,也就是build方法来构建MemcachedClient,这就可以绕开spring 3.0的这个问题。一个示范配置如下:
<bean name="memcachedClientBuilder" class="net.rubyeye.xmemcached.XMemcachedClientBuilder">
<constructor-arg>
<list>
<bean class="java.net.InetSocketAddress">
<constructor-arg>
<value>localhost</value>
</constructor-arg>
<constructor-arg>
<value>12000</value>
</constructor-arg>
</bean>
</list>
</constructor-arg>
</bean>
<bean name="memcachedClient" factory-bean="memcachedClientBuilder"
factory-method="build" destroy-method="shutdown" />
memcachedClientBuilder作为一个factory-bean,接受一个InetSocketAddress列表作为构造函数传入,最后MemcachedClient就可以通过factory-method——也就是build方法创建了。
多个节点情况下,可能你想设置权重,那么传入memcachedClientBuilder的第二个构造函数
参数权重数组即可:
<bean name="memcachedClientBuilder" class="net.rubyeye.xmemcached.XMemcachedClientBuilder">
<constructor-arg>
<list>
<bean class="java.net.InetSocketAddress">
<constructor-arg>
<value>localhost</value>
</constructor-arg>
<constructor-arg>
<value>12000</value>
</constructor-arg>
</bean>
<bean class="java.net.InetSocketAddress">
<constructor-arg>
<value>localhost</value>
</constructor-arg>
<constructor-arg>
<value>12001</value>
</constructor-arg>
</bean>
</list>
</constructor-arg>
<constructor-arg>
<list>
<value>1</value>
<value>2</value>
</list>
</constructor-arg>
</bean>
<bean name="memcachedClient" factory-bean="memcachedClientBuilder"
factory-method="build" destroy-method="shutdown" />
上面的例子将localhost:12000的权重设置为1,而localhost:12001的权重设置为2。除了这些配置外,XmemcachedClientBuilder还有其他选项,如配置一致性哈希算法、连接池等,完整的配置例子如下:
<bean name="memcachedClientBuilder" class="net.rubyeye.xmemcached.XMemcachedClientBuilder">
<!-- XMemcachedClientBuilder have two arguments.First is server list,and second is weights array. -->
<constructor-arg>
<list>
<bean class="java.net.InetSocketAddress">
<constructor-arg>
<value>localhost</value>
</constructor-arg>
<constructor-arg>
<value>12000</value>
</constructor-arg>
</bean>
<bean class="java.net.InetSocketAddress">
<constructor-arg>
<value>localhost</value>
</constructor-arg>
<constructor-arg>
<value>12001</value>
</constructor-arg>
</bean>
</list>
</constructor-arg>
<constructor-arg>
<list>
<value>1</value>
<value>2</value>
</list>
</constructor-arg>
<property name="connectionPoolSize" value="2"></property>
<property name="commandFactory">
<bean class="net.rubyeye.xmemcached.command.TextCommandFactory"></bean>
</property>
<property name="sessionLocator">
<bean class="net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator"></bean>
</property>
<property name="transcoder">
<bean class="net.rubyeye.xmemcached.transcoders.SerializingTranscoder" />
</property>
</bean>
<!-- Use factory bean to build memcached client -->
<bean name="memcachedClient" factory-bean="memcachedClientBuilder"
factory-method="build" destroy-method="shutdown"/>
最近读数理逻辑方面的资料,感慨自己没有早点接触逻辑学,如果在小学初中阶段就能有逻辑学的系统训练,我想我对数学的理解会完全不一样,对定理证明的思路也将大大开拓。中学时代对数学充满热情,基本上老师讲的都满足不了我,我的学习远远在课堂前面,初中的时候我已经接触了微积分、概率论、立体几何之类,教材是从我舅舅用过的教材垃圾堆里淘出来的,那时候我外婆准备卖了那堆书,就叫我去看看有没有想要的,印象中从那里掏出了不少好东西,武侠小说、马克思恩格斯全集、毛选、数学语文教材之流,在老家那个阅读资料极度缺少的地方,这些书成了我整个初中的精神食粮。上了高中后,家也搬到县城,离书城近了,并且学校有图书馆,更是如鱼得水。扯的太远,回到题目,以数学定理的证明为例,我们熟知的直接证明、反证法、数学归纳法、间接证明、存在性证明、唯一性证明其实背后都有牢固的逻辑定律在支撑,直接证明其实就是蕴含 p-> q,而反证法的原理是
¬¬p ↔ p,间接证明则是原命题与逆否命题等价 p-> q ↔ ¬q -> ¬p等等。如果在教授数学的同时,教授给学生背后的逻辑原理,那么将极大地开阔学生的视野并且让数学证明变的有趣。数理逻辑不仅仅对数学学习有益,对软件开发同样有帮助,例如以谓词和量词来精确地描述和分析系统规格,推理逻辑的使用也能启发你的设计思路,帮你找到一个复杂的设计的等价物。此外,你能更快地比别人做出某些逻辑难题,这也挺有趣。
Xmemcached 1.2.1正式发布,这是1.2.0发布以来的第一个小版本,主要改进是修复BUG、内部重构以及添加一些新特性。主要改进如下:
1、为Kestrel 1.2添加delete方法支持,kestrel 1.2发布后正式支持memcached delete协议
2、添加了一个新的序列化转换器 net.rubyeye.xmemcached.transcoders.TokyoTyrantTranscoder,专门提供给使用xmemcached连接Tokyo Tyrant的用户,这个转换器默认在value前加上4个字节的flag,因为Tokyo Tyrant不支持flag,所以默认无法存储除String之外的Java序列化类型。
3、添加两个新选项:
Transcoder.setCompressionThreshold(threshold)
Transcoder.setCompressionThreshold(threshold)
Transcoder接口添加了setCompressionThreshold用于设置压缩阀值,序列化后的value如果超过这个阀值将启用压缩,默认阀值是16K。
MemcachedClient.setSanitizeKeys(true|false)
MemcachedClient.setSanitizeKeys(true|false)
setSanitizeKeys用于决定是否启用URLEncoding来编码key,如果你用url作为key存储,这一特性能方便你的使用,默认为开启。
4、添加中文用户指南,比较完整的使用说明和选项说明,在线阅读。
5、内部优化,移除一些老代码和一些在1.1中被声明为Deprecated的方法。添加了更多单元测试。
6、BUG修复和对binary协议实现的部分优化。
项目主页:http://code.google.com/p/xmemcached/
下载地址: http://code.google.com/p/xmemcached/downloads/list
欢迎试用和反馈。
最近在锋爷的建议下开始读rabbitmq的源码,锋爷说这个项目已经很成熟,并且代码也很有借鉴和学习的意义,在自己写erlang代码之前看看别人是怎么写的,可以少走弯路,避免养成一些不好的习惯,学习一些最佳实践。读了一个星期,这个项目果然非常棒,代码也写的非常清晰易懂,一些细节的处理上非常巧妙,比如我这里想分享的网络层一节。
Rabbitmq是一个MQ系统,也就是消息中间件,它实现了AMQP 0.8规范,简单来说就是一个TCP的广播服务器。AMQP协议,你可以类比JMS,不过JMS仅仅是java领域内的API规范,而AMQP比JMS更进一步,它有自己的wire-level protocol,有一套可编程的协议,中立于语言。简单介绍了Rabbitmq之后,进入正题。
Rabbitmq充分利用了Erlang的分布式、高可靠性、并发等特性,首先看它的一个结构图:
这张图展现了Rabbitmq的主要组件和组件之间的关系,具体到监控树的结构,我画了一张图:
顶层是rabbit_sup
supervisor,它至少有两个子进程,一个是rabbit_tcp_client_sup,用来监控每个connection的处理进程
rabbit_reader的supervisor;rabbit_tcp_listener_sup是监控tcp_listener和
tcp_acceptor_sup的supervisor,tcp_listener里启动tcp服务器,监听端口,并且通过tcp_acceptor_sup启动N个tcp_accetpor,tcp_acceptor发起accept请求,等待客户端连接;tcp_acceptor_sup负责监控这些acceptor。这张图已经能给你一个大体的印象。
讲完大概,进入细节,说说几个我觉的值的注意的地方:
1、
tcp_accepto.erl,r对于accept采用的是异步方式,利用
prim_inet:async_accept/2方
法,此模块没有被文档化,是otp库内部使用,通常来说没必要使用这一模块,gen_tcp:accept/1已经足够,不过rabbitmq是广播程
序,因此采用了异步方式。使用async_accept,需要打patch,以使得socket好像我们从gen_tcp:accept/1得到的一样:
handle_info({inet_async, LSock, Ref, {ok, Sock}},
State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
%%这里做了patch
%% patch up the socket so it looks like one we got from
%% gen_tcp:accept/1
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
try
%% report
{Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
{PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
[inet_parse:ntoa(Address), Port,
inet_parse:ntoa(PeerAddress), PeerPort]),
%% 调用回调模块,将Sock作为附加参数
apply(M, F, A ++ [Sock])
catch {inet_error, Reason} ->
gen_tcp:close(Sock),
error_logger:error_msg("unable to accept TCP connection: ~p~n",
[Reason])
end,
%% 继续发起异步调用
case prim_inet:async_accept(LSock, -1) of
{ok, NRef} -> {noreply, State#state{ref=NRef}};
Error -> {stop, {cannot_accept, Error}, none}
end;
%%处理错误情况
handle_info({inet_async, LSock, Ref, {error, closed}},
State=#state{sock=LSock, ref=Ref}) ->
%% It would be wrong to attempt to restart the acceptor when we
%% know this will fail.
{stop, normal, State};
2、
rabbitmq内部是使用了多个并发acceptor,这在高并发下、大量连接情况下有效率优势,
类似java现在的nio框架采用多个reactor类似,查看tcp_listener.erl:
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
{ok, LSock} ->
%%创建ConcurrentAcceptorCount个并发acceptor
lists:foreach(fun (_) ->
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
error_logger:info_msg("started ~s on ~s:~p~n",
[Label, inet_parse:ntoa(LIPAddress), LPort]),
%%调用初始化回调函数
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
"failed to start ~s on ~s:~p - ~p~n",
[Label, inet_parse:ntoa(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
这里有一个技巧,如果要循环N次执行某个函数F,可以通过lists:foreach结合lists:duplicate(N,dummy)来处理。
lists:foreach(fun(_)-> F() end,lists:duplicate(N,dummy)).
3、
simple_one_for_one策略的使用,可以看到对于tcp_client_sup和tcp_acceptor_sup都采用了simple_one_for_one策略,而非普通的one_fo_one,这是为什么呢?
这牵扯到simple_one_for_one的几个特点:
1)simple_one_for_one内部保存child是使用dict,而其他策略是使用list,因此simple_one_for_one更适合child频繁创建销毁、需要大量child进程的情况,具体来说例如网络连接的频繁接入断开。
2)使用了simple_one_for_one后,无法调用terminate_child/2 delete_child/2 restart_child/2
3)start_child/2
对于simple_one_for_one来说,不必传入完整的child
spect,传入参数list,会自动进行
参数合并。
在一个地方定义好child
spec之后,其他地方只要start_child传入参数即可启动child进程,简化child都是同一类型进程情况下的编程。
在
rabbitmq中,tcp_acceptor_sup的子进程都是tcp_acceptor进程,在tcp_listener中是启动了
ConcurrentAcceptorCount个tcp_acceptor子进程,通过supervisor:start_child/2方法:
%%创建ConcurrentAcceptorCount个并发acceptor
lists:foreach(fun (_) ->
{ok, _APid} =
supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
注意到,这里调用的start_child只传入了
LSock一个参数,另一个参数CallBack是在定义child spec的时候传入的,参见tcp_acceptor_sup.erl:
init(Callback) ->
{ok, {{simple_one_for_one, 10, 10},
[{tcp_acceptor, {tcp_acceptor, start_link, [
Callback]},
transient, brutal_kill, worker, [tcp_acceptor]}]}}.
Erlang内部自动为simple_one_for_one做了
参数合并,最后调用的是tcp_acceptor的init/2:
init({
Callback,
LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
Error -> {stop, {cannot_accept, Error}}
end.
对于tcp_client_sup的情况类似,tcp_client_sup监控的子进程都是rabbit_reader类型,在
rabbit_networking.erl中启动tcp_listenner传入的处理connect事件的回调方法是是
rabbit_networking:start_client/1:
start_tcp_listener(Host, Port) ->
start_listener(Host, Port, "TCP Listener",
%回调的MFA
{
?MODULE, start_client, []}).
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Child),
Child ! {go, Sock},
Child.
start_client调用了supervisor:start_child/2来动态启动rabbit_reader进程。
4、
协议的解析,消息的读取这部分也非常巧妙,这一部分主要在rabbit_reader.erl中,对于协议的解析没有采用gen_fsm,而是实现了一个巧妙的状态机机制,核心代码在mainloop/4中:
%启动一个连接
start_connection(Parent, Deb, ClientSock) ->
process_flag(trap_exit, true),
{PeerAddressS, PeerPort} = peername(ClientSock),
ProfilingValue = setup_profiling(),
try
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
%延时发送握手协议
Erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
%进入主循环,更换callback模块,魔法就在这个switch_callback
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init},
%%注意到这里,handshake就是我们的回调模块,8就是希望接收的数据长度,AMQP协议头的八个字节。
handshake, 8))
魔法就在switch_callback这个方法上:
switch_callback(OldState, NewCallback, Length) ->
%发起一个异步recv请求,请求Length字节的数据
Ref = inet_op(fun () -> rabbit_net:async_recv(
OldState#v1.sock, Length, infinity) end),
%更新状态,替换ref和处理模块
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
异步接收Length个数据,如果有,erlang会通知你处理。处理模块是什么概念呢?其实就是一个状态的概念,表示当前协议解析进行到哪一步,起一个label的作用,看看mainloop/4中的应用:
mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
%%接收到数据,交给handle_input处理,注意handle_input的第一个参数就是callback
{inet_async, Sock, Ref, {ok, Data}} ->
%handle_input处理
{State1, Callback1, Length1} =
handle_input(State#v1.callback, Data,
State#v1{recv_ref = none}),
%更新回调模块,再次发起异步请求,并进入主循环
mainloop(Parent, Deb,
switch_callback(State1, Callback1, Length1));
handle_input有多个分支,每个分支都对应一个处理模块,例如我们刚才提到的握手协议:
%handshake模块,注意到第一个参数,第二个参数就是我们得到的数据
handle_input(
handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
State = #v1{sock = Sock, connection = Connection}) ->
%检测协议是否兼容
case check_version({ProtocolMajor, ProtocolMinor},
{?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
true ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
%兼容的话,进入connections start,协商参数
ok = send_on_channel0(
Sock,
#'connection.start'{
version_major = ?PROTOCOL_VERSION_MAJOR,
version_minor = ?PROTOCOL_VERSION_MINOR,
server_properties =
[{list_to_binary(K), longstr, list_to_binary(V)} ||
{K, V} <-
[{"product", Product},
{"version", Version},
{"platform", "
Erlang/OTP"},
{"copyright", ?COPYRIGHT_MESSAGE},
{"information", ?INFORMATION_MESSAGE}]],
mechanisms = <<"PLAIN AMQPLAIN">>,
locales = <<"en_US">> }),
{State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT},
connection_state = starting},
frame_header, 7};
%否则,断开连接,返回可以接受的协议
false ->
throw({bad_version, ProtocolMajor, ProtocolMinor})
end;
其他协议的处理也是类似,通过动态替换callback的方式来模拟状态机做协议的解析和数据的接收,真的很巧妙!让我们体会到Erlang的魅力,FP的魅力。
5、序列图:
1)tcp server的启动过程:
2)一个client连接上来的处理过程:
小结:从上面的分析可以看出,rabbitmq的网络层是非常健壮和高效的,通过层层监控,对每个可能出现的风险点都做了考虑,并且利用了prim_net模块做异步IO处理。分层也是很清晰,将业务处理模块隔离到client_sup监控下的子进程,将网络处理细节和业务逻辑分离。在协议的解析和业务处理上虽然没有采用gen_fsm,但是也实现了一套类似的状态机机制,通过动态替换Callback来模拟状态的变迁,非常巧妙。如果你要实现一个tcp server,强烈推荐从rabbitmq中扣出这个网络层,你只需要实现自己的业务处理模块即可拥有一个高效、健壮、分层清晰的TCP服务器。
入这行也有四年了,从过去对软件开发的懵懂状态,到现在可以算是有一个初步认识的过程,期间也参与了不少项目,从一开始单纯的编码,到现在可以部分地参与一些设计方案的讨论,慢慢对设计方案的评判标准有一点感受。读软件工程、架构设计、模式之类的书,对于书中强调的一些标准和原则的感受只是感官浅层的印象,你可以一股脑从嘴里蹦出一堆词,什么开闭原则、依赖倒转、针对接口编程、系统的可伸缩性、可维护性、可重用等等,也仅仅停留在知道的份子上。不过现在,我对一个设计方案的评价标准慢慢变的很明确:
简单、符合当前和一定预期时间内的需求、可靠、直观(或者说透明)。
简单,不是简陋,这是废话了。但是要做到简单,却是绝不简单,简单跟第四点直观有直接的关系,简单的设计就是一个直观的设计,可以让你一眼看得清的设计方案,也就是透明。一个最简单的评判方法,你可以讲这个方案讲给一个局外人听,如果能在短时间内让人理解的,那么这个方案八成是靠谱的。记的有本书上讲过类似的方法,你有一个方案,那就拿起电话打给一个无关的人员,如果你能在电话里说清楚,就表示这个方案相当靠谱,如果不能,那么这个方案很可能是过度复杂,过度设计了。
简单的设计,往往最后得出的结果是一个可靠性非常高的系统。这很容易理解,一个复杂的设计方案,有很多方面会导致最后的实现会更复杂:首先是沟通上的困难,一个复杂的方案是很难在短时间内在团队内部沟通清楚,每个开发人员对这个方案的理解可能有偏差;其次,复杂的方案往往非常考验设计人员和开发人员的经验、能力和细致程度,复杂的方案要考量的方面肯定比简单方案多得多,一个地方没有考虑到或者不全面,结果就是一个充满了隐患的系统,时不时地蹦出一个BUG来恶心你,这并非开发人员的能力问题,而是人脑天然的局限性(天才除外,咳咳)。
第二点,符合当前和一定预期时间内的需求。我们都知道,不变的变化本身,指望一个方案永久解决所有问题是乌托邦的梦想。复杂方案的出炉通常都是因为设计人员过度考量了未来系统的需求和变化,我们的系统以后要达到10倍的吞吐量,我们的系统以后要有几十万的节点等等。当然,首先要肯定的是对未来需求的考量是必需的,一个系统如果实现出来只能应付短时间的需求,那肯定是不能接受的。但是我们要警惕的是过度考量导致的过度复杂的设计方案,这还有可能是设计人员“炫技”的欲望隐藏在里头。这里面有一个权衡的问题,比如这里有两个方案:一个是两三年内绝对实用的方案,简单并且可靠直观,未来的改进余地也不错;另一个方案是可以承载当前的几十倍流量的方案,方案看起来很优雅,很时尚,实现起来也相对复杂。如何选择?如果这个系统是我们当前就要使用的,并且是关键系统,那么我绝对会选择前一个方案,在预期时间内去改进这个方案,因为对于关键系统,简单和可靠是性命攸关的。况且,我坚定地认为一个复杂的设计方案中绝对隐藏着一个简单的设计,这就像一个复杂的数学证明,通常都可以用更直观更简单的方式重新证明(题外话,费尔马大定理的证明是极其复杂的,现在还有很多人坚信有一个直观简单的证明存在,也就是那个费尔马没有写下的证明)。最近我们的一个方案讨论也证明了这一点,一个消息优先级的方案,一开始的设想是相对复杂的,需要在存储结构和调度上动手脚,后来集思广益,最后定下的方案非常类似linux的进程调度策略,通过分级queue和时间片轮询的方式完美解决了优先级的问题。这让我想起了软件开发的“隐喻”的力量,很多东西都是相通相似的。
上面这些乱弹都是自己在最近碰到的一些讨论和系统故障想起的,想想还是有必要写下来做个记录。
Xmemcached 1.2.0发布到现在,从反馈来看,已经有不少用户尝试使用xmc作为他们的memcached client,并且1.2.0这个版本也比较稳定,没有发现严重的BUG。Xmemcached下一个版本是1.2.1,初步计划是在元旦左右发布,计划做出的改进如下:
1、重写所有的单元测试和集成测试,提高代码的健壮性
2、新增一些功能,如
issue 66。
3、移除deprecated方法
4、提供用户指南。
1.2.1之后初步的设想是开发1.3版本,现在xmc的最大问题是对yanf4j的依赖,耦合比较严重,1.3版本将抽象出网络层,解耦yanf4j和xmc,yanf4j也将重构并引入filter机制。1.3版本也将发布一个支持unix domain socket的附带项目,事实上这个项目已经初步开发完成,基于juds,但是性能并不理想,我的计划是自己写一个东西来替代juds,juds最大的问题是仅支持阻塞IO,没有使用poll/epoll、select之类。
总之,我可以确认的是xmc本身将继续发展,也希望更多的朋友来尝试使用,有任何问题和意见都可以反馈给我,我都将及时回复。
update:修复了在linux firefox上不兼容的BUG。
下午搞了个
Erlang web shell,可以在web页面上像eshell那样进行交互式的Erlang编程,方便学习和测试。这样一来,一个
erlwsh就可以服务多个client,只要你有网络和浏览器,随时随地可以敲上几行erlang看看结果。代码很简单,就不多说了,有兴趣的看看,通过
mochiweb的http chunk编码,client通过Ajax Post方式提交。眼见为实,看看运行截图:
工程在google code上:
http://code.google.com/p/erlwsh/
安装很简单,首先确保你已经安装了
Erlang,接下来:
svn checkout http://erlwsh.googlecode.com/svn/trunk/ erlwsh-read-only
cd erlwsh-read-only
scripts/install_mochiweb.sh
make
./start.sh
因为需要使用mochiweb,所以提供了下载并自动安装的脚本,这是litaocheng的大作。启动后访问 http://localhost:8000/shell 即可,have fun.
上篇是在兴奋的状态下当天晚上写的,这下篇拖到现在,印象也开始有点模糊了,以下全凭记忆,如有谬误敬请原谅。
CN-Erlounge第二天的topic,一开始是来自汕头的一位朋友介绍他对
利用单机程序组建分布式模型的分析与实例,实话说这个Topic很一般,基本上文不对题,并且举的例子没有说服力,有点为了用Erlang而用Erlang的感觉,其实跟Erlang关系不大,并且用Erlang搭建原型的话,还不如用python、ruby脚本来搞,后来跟同事的交流说,他介绍的还只是作坊式的一些做法,没有多少可借鉴的意义。
接下来是侯明远的《
基于Erlang实现的MMO服务器连接管理服务》,也就是Erlang在他的网游项目替换c++的一些尝试,效果非常好,不仅代码量大大减少,而且维护起来也非常容易。特别是他介绍了用Erlang搭建测试环境的尝试给了我们不少启发,事实上在回来后,我也尝试用Erlang写了个用于压测的代理服务器,不过由于我们的client仍然是Java,无法做到类似的分布式压测管理,仅用Erlang做中心的代理转发服务器。感受是Erlang做网络编程确实非常容易,Erlang的网络层将底层封装的非常完美,对于用户来说完全屏蔽了网络编程的复杂细节,并且提供了gen_server、gen_fsm这样的基础设施,宁外Erlang对binary数据的操作非常容易,对协议解析来说也是个巨大优势,整个程序就200多行代码,这还包括了一个通用的tcp服务器框架,借鉴了mochiweb的socket server实现。过去我对Erlang的message passing风格的理解还局限在actor模型上,进程之间相互发送消息,而其实Erlang的消息传递风格还体现在语言本身以及整个otp库的实现上,例如在accept一个连接后,我们调用服务器的逻辑代码:
accept_loop({Server, LSocket, M}) ->
{ok, Socket} = gen_tcp:accept(LSocket),
% spawn a new process to accept
gen_server:cast(Server, {accepted, self()}),
% initialize
State=M:init(Socket),
M:loop(State,Socket).
其中的M是你的逻辑模块,我们直接调用M:loop(State,Socket)进入逻辑模块的处理,这里的init和loop方法都是
约定,或者说模块的回调方法,你也可以理解成Ruby的duck typing。我不知道M有没有这两个方法,
我只是尝试给它们传递消息(调用),等待响应,这同样是
消息传递风格。同样,理解gen_server这样的behaviour的关键也是回调,你只要实现这些behaviour的回调方法,响应这些模块的消息,你将天然地拥有它们的强大功能。
接下来是周爱民的《
谈谈erlang网络环境下的几种数据流转形式》,怎么说呢,我听的懂,但是似乎没有抓到key point,老大们理解问题、分析问题的层次似乎不同了。不过其中讲到如何解决异步的通讯顺序问题对我们有一定借鉴价值。听了快两天的课,非常疲倦,加上头天晚上没睡好,这上午稀里哗啦就过了,中午组委会提供披萨,实话说好难吃啊,口味不惯。
压轴的是阿里云老吴的《
XEngine介绍》,第一次听说xengine是在阿里云计算公司的成立展览上,我跟开发人员有个短暂的交流,大概明白Erlang在xengine中扮演的角色。XEngine的野心很大,做中国的EC2、AppEngine,Erlang在其中的角色扮演了监控和协调的作用,利用它天然的分布式编程模型,xengine需要依赖阿里云的飞天计划,涵盖了分布式文件系统、MQ、通讯组件、分布式持久层等等,这些基础设施没搞好,xengine 还只能是“云”。集团内部早有消息是希望能统一集团内的各种基础设施,包括我们现在的这个MQ系统,我跟老吴开玩笑说他们做好了我们就要失业了:)。说到云计算,新浪的app engine据说已经开始内部测试了。那天我们老大还在说貌似国内只有阿里在搞app engine,没想到新浪倒走到前面咯。
后来是提议到公园去走走,大家随意聊聊,因为比较累以及跟各位大佬们不熟,阿宝朱GG他们为了赶飞机也提早走了,我们三个就提前撤退咯。杭州的出租车3点半交班,加上举办马拉松,打不到车,走了N远的路才坐到公交回家,到家天色刚晚。
CN-Erlounge IV的质量是我参加过的技术会议里面最高的,不过我其实没参加过多少技术会议,哈哈。总结下感受,从CN-Erlounge的Topic来看,已经有很多公司在实践中应用Erlang,我问arbow这一届Erlang大会跟过去的区别(过去我没参加
过),arbow就说这一届的实践经验的分享相对比较多,一个侧面也可以反应Erlang在国内的发展程度。不过Erlang还是小众语言,这从参会的人数上可以看出来。搜狐、校内、阿里这样的互联网巨头都开始尝试Erlang,一方面可以证明Erlang这个平台的吸引力,一方面也可以说明Erlang在国内已经开始进入实际应用阶段,对于许多还在观望的人来说,这是个好消息。
今天和同事一起去参加了
CN-Erlounge IV大会,大会的精彩程度超过我的预期,每个Topic都是精心准备并且非常实在,并且见到了很多只闻其名未见其人的大牛,比如传说中的T1、许老大、庄表伟、周爱民老师等。我们3个太早去了,8点半到了会场,发现大多数还没来,阿宝同学和锋爷他们更是9点多才出的门,因此整个会议进程都相应推迟了。
首先是校内网的成立涛做了《Erlang开发实践》的演讲,主题是一个典型的Erlang项目的开发流程、目录结构、单元测试、集成测试、常见问题解决等的概括性介绍,并且他还特意写了一个工程作为Sample,就是放在google code上的
erlips,非常佩服这样的专业精神。交流提到校内网已经部署了30个以上的Erlang节点做广告推送系统。Topic本身非常实在,并且有实际的代码和工程文件提供,可以作为了解Erlang开发基本过程的骨架工程。接下来是锋爷的重头戏《Erlang系统调优》,锋爷的Topic其实超出了Erlang的范畴,对所有系统的性能调优都有借鉴的意义,主题本身将Erlang平台和unix操作系统做了比较,认为Erlang本身也是个OS平台,并且介绍了Erlang提供的方方面面的工具,包括调试、诊断、性能剖析、监控、语言级别的优化、系统的优化选项、协议的选型、应该避免的陷阱、最佳实践等等,你不得不承认Erlang实在是太牛x了,这样的平台难怪能做到7个9的可靠性。这个Topic非常精彩,从ppt本身能看到的只是局部信息,等有视频的时候准备重新看看。
中午组委会提供了午餐,还是很方便,会议的地点吃饭地方不好找,不过晚饭没提供,我们跑了不远的路找了家小饭馆解决晚饭问题。下午的Topic一开始是
饿狼战役的创建者老范的介绍 ,饿狼战役是一个Erlang编写的棋牌型的游戏,玩家可以编写自己的指挥进程参与竞赛,实际上是作为一个Erlang学习的良好环境,类似过去非常流行的robot code游戏一样。我因为跟阿宝他们去闲逛,错过了大部分介绍,源码已经读过,不过我对AI一点也不了解,写个程序干掉英格兰卫兵还是没问题的,哈哈。后来是python社区的大妈介绍了
erlbattle社区的养成问题,谈到了一个社区的生命周期问题,如何去建设一个技术社区,我没有多少感受,不多扯了。饿狼战役推荐去看看,如果你对AI或者erlang有兴趣的话,可以去试试。接下来是T1做的《CUDA编程》的Topic,这个Topic我在提前看ppt的时候就觉的估计自己完全听不懂,最后果然如此,这是一个关于现在很热门GPU编程的Topic,讲述了如何在10ms内完成jpeg的压缩的优化手段和编程技巧,最终的结果是2毫秒多一点就搞定了这个需求,T1介绍的非常详细关于算法和技巧方面的细节,完全跟做工程的是两个世界。T1大大是火星人,咱就不多说了,景仰就行了。
晚上的两个Topic都是关于如何在C++中借鉴Erlang来实现消息传递风格的编程,51.com的qiezi和崔博介绍了他们的actor框架,他们是基于协程和线程池调度来实现伪同步调用,可以实现类似Erlang的进程风格的消息传递,但是要做一个工作就是将类似socket读写、文件读写、sleep这样的阻塞调用封装一下,有异步io可以利用的就利用异步IO,没有的就使用线程池调度,事实上他们做的事情正是Erlang已经帮你做了,现场有很多争议,认为这样还不如使用Erlang,因为你这样做无法避免两个问题:协程的异常处理和阻塞调用可能导致整个进程内的协程不可用,毕竟协程是非抢占式的,并且无法充分利用多CPU的优势。但是许老大提到,从工程实践角度,Erlang毕竟还是小众语言,维护和招人上都不容易,而系统的高可靠性可以从更高层次上去解决,而我们可以从Erlang借鉴这些做法来改进我们的传统语言编程模型。许老大还提到他认为的Erlang编程模型的两个缺点:一个是同步调用的死锁问题,一个是资源进程的独占性问题,这两个问题最终还是要回归到异步模型。这两个问题,我认为其实是一个问题,还是由于资源的有限和独占性引起的,像IO这样的资源就是,你可以将请求并行化设置回调函数不阻塞调用本身,但是实际的IO读写本身仍然是串行的,只不过将这部分工作交给谁来做的问题,我觉的这个问题对于任何编程语言都是一样的无法解决的。对于同步模型和异步模型本身,我更偏向于同步模型,这从xmc的API就可以看出来,同步模型更符合人类直觉,也易于使用,而异步模型容易将业务碎片化,不直观并且使用上也不便利。
以上是今天的流水账,有兴趣看到这的估计没几个,哇咔咔。
补充,遗漏了一个香港老外作的topic演讲,是关于erlang实现的restms,restms是一种restful的message协议,现在想来他主要介绍了restms协议以及一个erlang实现,也就是fireflymq,其中特别介绍了riak这样一个key-value store,它类似amazon dynamo,同样采用consistent hash,多节点备份,vector clock同步等等,比较特殊的地方是他可以将数据组织成类似web超链接形成的网状结构并存储和查询。
看到这么一个题目:
{3,2,2,6,7,8}排序输出,7不在第二位,68不在一起。
这样的题目似乎避免不了遍历,关键还在于过滤条件的安排,怎么让过滤的范围尽量地小。通常的做法是循环遍历,对于类似Prolog这样的语言来说,由于内置了推理引擎,可以简单地描述问题,让引擎来帮你做递归遍历,解决这类问题是非常简单的。Prolog好久没写,以Ruby的amb操作符为例来解决这道题目:
#结果为hash,去重
$hash={}
amb=Amb.new
array=[3,2,2,6,7,8]
class << array
alias remove delete
def delete(*nums)
result=dup
nums.each do |n|
result.delete_at(result.index(n)) if result.index(n)
end
result
end
end
#从集合选元素
one=amb.choose(*array)
two=amb.choose(*(array.delete(one)))
three=amb.choose(*(array.delete(one,two)))
four=amb.choose(*(array.delete(one,two,three)))
five=amb.choose(*(array.delete(one,two,three,four)))
six=amb.choose(*(array.delete(one,two,three,four,five)))
#条件1:第二个位置不能是7
amb.require(two!=7)
#条件2:6跟8不能一起出现
def six_eight_not_join(a,b)
"#{a}#{b}"!="68"&&"#{a}#{b}"!="86"
end
amb.require(six_eight_not_join(one,two))
amb.require(six_eight_not_join(two,three))
amb.require(six_eight_not_join(three,four))
amb.require(six_eight_not_join(four,five))
amb.require(six_eight_not_join(five,six))
#条件3:不重复,利用全局hash判断
def distinct?(one,two,three,four,five,six)
if $hash["#{one},#{two},#{three},#{four},#{five},#{six}"].nil?
$hash["#{one},#{two},#{three},#{four},#{five},#{six}"]=1 #记录
true
else
false
end
end
amb.require(distinct?(one,two,three,four,five,six))
puts "#{one},#{two},#{three},#{four},#{five},#{six}"
amb.failure
三个条件的满足通过amb.require来设置,这里安排的只是一种顺序,可以根据实际测试结果来安排这些条件的顺序以最大程度地提高效率。代码注释很清楚了,我就不多嘴了。Ruby amb的实现可以看
这里。什么是amb可以看
这个。
我们小组还要招java方面的工程师和架构师,老大说还有两个社招名额,不用就浪费了。我可以帮忙推荐,情况介绍如下:
工作地点:杭州
公司:阿里旗下子公司
工作内容:消息中间件或者分布式持久框架
要求:
1、因为是社招,请应届直接忽略,公司有校园招聘
2、最好能有2年以上工作经验(非硬性)
3、java基础牢固
4、对java并发、网络或者数据库编程有丰富经验,如果对JVM方面也了解,那更好
5、有分布式系统开发和设计经验尤佳
6、有一定的性能调优经验
7、熟悉各种常用的开源框架
8、对技术有追求、有激情,有气度,能沟通,能交流。
来这里你能得到什么:
1、大规模分布式系统的设计和开发
2、处理海量数据系统的设计与开发
3、大量的技术交流机会
4、相对轻松的、和谐的团队和工作氛围
欢迎有兴趣的朋友投递简历,我的email : killme2008@gmail.com
unix域协议并不是一个实际的协议族,而是在单个主机上执行客户/服务器通信的一种方法,是IPC的方法之一,特定于*nix平台。使用unix domain socket有三个好处:
1)在同一主机上,unix domain socket比一般的tcp socket快上一倍,性能因素这是一个主要原因。
2)unix domain socket可以在同一主机的不同进程之间传递文件描述符
3)较新的unix domain socket实现把客户的ID和组ID提供给服务器,可以让服务器作安全检查。
memcached的FAQ中也提到为了安全验证,可以考虑让memcached监听unix domain socket。Memcached支持这一点,可以通过-s选项指定unix domain socket的路径名,注意,为了可移植性,尽量使用绝对路径,因为Posix标准声称给unix domain socket绑定相对路径将导致不可预计的后果,我在linux的测试是可以使用相对路径。假设我将memcached绑定到/home/dennis/memcached,可以这样启动memcached:
memcached -s /home/dennis/memcached
端口呢?没有端口了,/home/dennis/memcached这个文件你可以理解成FIFO的管道,unix domain socket的server/client通过这个管道通讯。
libmemcached支持通过unix domain socket来访问memcached,基于libmemcached实现的client应该都可以使用这一功能。目前来看,java平台由于不支持平台相关的unix domain socket,因此无法享受memcached的这一特性。
不过有一个开源项目通过jni支持实现了unix domain socket,这个项目称为
juds。核心类就三个,使用非常简单。下载文件后,解压缩,make & make install即可。注意,Makefile中写死了JAVA_HOME,手工修改即可。看一个例子,经典的Time server:
package com.google.code.juds.test;
import java.io.IOException;
import com.google.code.juds.*;
import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
public class TimeServer {
public static void main(String[] args) {
try {
UnixDomainSocketServer server = new UnixDomainSocketServer(
"/home/dennis/time", UnixDomainSocket.SOCK_STREAM);
OutputStream output = server.getOutputStream();
Date date = new Date();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
output.write(dateFormat.format(date).getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}
通过
UnixDomainSocketServer创建server,指定类型为SOCK_STREAM,juds也支持UDP类型。client的使用如下:
byte[] b = new byte[128];
UnixDomainSocketClient socket = new UnixDomainSocketClient("/home/dennis/time",
UnixDomainSocket.SOCK_STREAM);
InputStream in = socket.getInputStream();
in.read(b);
System.out.println("Text received: \"" + new String(b) + "\"");
socket.close();
显然,juds还只支持阻塞IO,考虑可进一步使用select、poll来扩展实现非阻塞IO。
最后一个例子,通过juds访问memcached的unix domain socket,简单的version协议调用:
byte[] b = new byte[128];
UnixDomainSocketClient socket = new UnixDomainSocketClient("/home/dennis/memcached",
UnixDomainSocket.SOCK_STREAM);
OutputStream out = socket.getOutputStream();
String text = "version\r\n";
out.write(text.getBytes());
InputStream in = socket.getInputStream();
in.read(b);
System.out.println("Text received: \"" + new String(b) + "\"");
socket.close();
输出
Text received: "VERSION 1.4.1"
字符串操作是任何一门编程语言中最常用的操作之一,scheme也提供了一系列procudure来操作字符串。
1、字符串的比较,有6个,分别是string=? string>? string<? string>=? string<=?
这与其他语言中对string的比较并无不同,比较字符和长度。
例子:
(string=? "mom" "mom")

#t
(string<? "mom" "mommy")

#t
(string>? "Dad" "Dad")

#f
(string=? "Mom and Dad" "mom and dad")

#f
(string<? "a" "b" "c")

#t
注意这些比较操作是大小写敏感。相应的,大小写不敏感的版本:
procedure: (string-ci=? string1 string2 string3 ...)
procedure: (string-ci<? string1 string2 string3 ...)
procedure: (string-ci>? string1 string2 string3 ...)
procedure: (string-ci<=? string1 string2 string3 ...)
procedure: (string-ci>=? string1 string2 string3 ...)
2、从字符构造字符串,使用string过程
(string #\a) => "a"
(string #\a #\b #\c) => "abc"
注意,换行字符是#\newline,回车字符是#\return
3、重复N个字符构造字符串
(make-string) => ""
(make-string 4 #\a) =>"aaaa")
4、字符串长度 string-length
(string-length "") =>0
(string-length "dennis") => 6
5、取第N个字符,相当于java中的charAt:
(string-ref "hi there" 0)

#\h
(string-ref "hi there" 5)

#\e
6、修改字符串的第N个字符:
(string-set! "hello" 0 #\H) => "Hello"
7、拷贝字符串:
(let ((str "abc"))
(eq? str (string-copy str))) => #f
(let ((str "abc"))
(equal? str (string-copy str))) => #t
8、拼接字符串,string-append
(string-append) => ""
(string-append "abc" "defg") => "abcdefg"
9、截取子串
(substring "hi there" 0 1)

"h"
(substring "hi there" 3 6)

"the"
(substring "hi there" 5 5)

""
10、填充字符串
(let ((str (string-copy "sleepy")))
(string-fill! str #\Z)
str)

"ZZZZZZ"
11、与list的相互转换
(string->list "")

()
(string->list "abc")

(#\a #\b #\c)
(list->string '())

""
(list->string '(#\a #\b #\c))

"abc"
(list->string
(map char-upcase
(string->list "abc")))

"ABC"
一张截图,Java虽然号称跨平台,然而涉及到跟网络相关时,还是依赖于各个平台的实现。对写java网络编程的朋友有点价值。
基于java nio的java memcached client——xmemcached正式发布1.2.0-stable版本,这是一个稳定的版本,在1.2.0-RC2的基础上做了性能改进和BUG修复。在用户的反馈下,发现了数个比较严重的BUG,因此这个版本建议升级以规避这些可能出现的BUG。相比于1.2.0-RC2,主要的改进如下:
1、添加心跳检测,默认开启这个特性,你可以通过
memcachedClient.setEnableHeartBeat(false);
memcachedClient.setEnableHeartBeat(false);
来关闭。心跳检测出于兼容性考虑是基于version协议实现的。
2.添加新的incr/decr方法,允许传入初始值,如果指定的key不存在的时候,就将该值add到memcached。具体参见API文档。
3.修复数个BUG,如Issue 55,Issue 57,Issue 58,Issue ,Issue 60。具体请看这里。
总结1.2相比于1.1版本的主要新增特性列表如下:
1、支持完整的memcached二进制协议
2、支持java nio连接池。
3、支持kestrel。
4、支持与hibernate-memcached的集成
5、日志从common-logging迁移到slf4j
6、简化构建等。
7、兼容JDK5。
欢迎试用并反馈,我的email: killme2008@gmail.com
喜欢奇幻的朋友可以瞧瞧,据说是烟大推荐的,同样在17K上,国人写的西方奇幻,还未写完,但是长度已经足够你好好享受。味道呢?战斗类似《博德之门》,语言比较绕,没读过此类作品的可能不习惯,文笔没得说,写作水准暂未发现下降趋势,值得长期追。名字比较奇怪,《
昆古尼尔》,是北欧神话中大神奥丁的所有物——永恒之枪。故事就不剧透了,前两章可能比较晕,但是坚持看下去就能搞明白啦。如果你实在着急呢,可以看看这个
背景介绍。
采用的是
jboss netty的benchmark,环
境是两台linux机器,都是4核16G内存以及2.6内核,网络环境是公司内网,带宽是1Gbps
,JDK1.6.0_07。对比的是
mina 2.0M6和
yanf4j 1.0-stable,两者都在压到16K,5000并发的时候客户端退出,因此后面给出的图有个16K的在5000并发为0,事实上只是几个连接失败,但是benchmark client就忽略了这个数据。实际过程还测试了1万并发连接的情况,但是由于测试客户端很容易退出,因此最后还是选定最大并发5000。注意,并非mina和yanf4j无法支撑1万个连接,而是benchmark client本身的处理,再加上内核tcp参数没有调整造成的。
首先看源码,mina的Echo Server:
package org.jboss.netty.benchmark.echo.server;
import java.net.InetSocketAddress;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.netty.benchmark.echo.Constant;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
* @version $Rev: 394 $, $Date: 2008-10-03 12:55:27 +0800 (星期五, 03 十月 2008) $
*
*/
public class MINA {
public static void main(String[] args) throws Exception {
boolean threadPoolDisabled = args.length > 0 && args[0].equals("nothreadpool");
SocketAcceptor acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors());
acceptor.getSessionConfig().setMinReadBufferSize(Constant.MIN_READ_BUFFER_SIZE);
acceptor.getSessionConfig().setReadBufferSize(Constant.INITIAL_READ_BUFFER_SIZE);
acceptor.getSessionConfig().setMaxReadBufferSize(Constant.MAX_READ_BUFFER_SIZE);
acceptor.getSessionConfig().setThroughputCalculationInterval(0);
acceptor.getSessionConfig().setTcpNoDelay(true);
acceptor.setDefaultLocalAddress(new InetSocketAddress(Constant.PORT));
if (!threadPoolDisabled) {
// Throttling has been disabled because it causes a dead lock.
// Also, it doesn't have per-channel memory limit.
acceptor.getFilterChain().addLast(
"executor",
new ExecutorFilter(
Constant.THREAD_POOL_SIZE, Constant.THREAD_POOL_SIZE));
}
acceptor.setHandler(new EchoHandler());
acceptor.bind();
System.out.println("MINA EchoServer is ready to serve at port " + Constant.PORT + ".");
System.out.println("Enter 'ant benchmark' on the client side to begin.");
System.out.println("Thread pool: " + (threadPoolDisabled? "DISABLED" : "ENABLED"));
}
private static class EchoHandler extends IoHandlerAdapter {
EchoHandler() {
super();
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
session.write(((IoBuffer) message).duplicate());
}
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
session.close();
}
}
}
再看Yanf4j的Echo Server,没有多大区别:
package org.jboss.netty.benchmark.echo.server;
import java.nio.ByteBuffer;
import org.jboss.netty.benchmark.echo.Constant;
import com.google.code.yanf4j.config.Configuration;
import com.google.code.yanf4j.core.Session;
import com.google.code.yanf4j.core.impl.HandlerAdapter;
import com.google.code.yanf4j.core.impl.StandardSocketOption;
import com.google.code.yanf4j.nio.TCPController;
public class Yanf4j {
public static void main(String[] args) throws Exception {
boolean threadPoolDisabled = args.length > 0
&& args[0].equals("nothreadpool");
Configuration configuration = new Configuration();
configuration.setCheckSessionTimeoutInterval(0);
configuration.setSessionIdleTimeout(0);
configuration
.setSessionReadBufferSize(Constant.INITIAL_READ_BUFFER_SIZE);
TCPController controller = new TCPController(configuration);
controller.setSocketOption(StandardSocketOption.SO_REUSEADDR, true);
controller.setSocketOption(StandardSocketOption.TCP_NODELAY, true);
controller.setHandler(new EchoHandler());
if (!threadPoolDisabled) {
controller.setReadThreadCount(Constant.THREAD_POOL_SIZE);
}
controller.bind(Constant.PORT);
System.out.println("Yanf4j EchoServer is ready to serve at port "
+ Constant.PORT + ".");
System.out
.println("Enter 'ant benchmark' on the client side to begin.");
System.out.println("Thread pool: "
+ (threadPoolDisabled ? "DISABLED" : "ENABLED"));
}
static class EchoHandler extends HandlerAdapter {
@Override
public void onMessageReceived(final Session session, final Object msg) {
session.write(((ByteBuffer) msg).duplicate());
}
@Override
public void onExceptionCaught(Session session, Throwable t) {
session.close();
}
}
}
两者都启用线程池(16个线程),开启TCP_NODELAY选项,Client采用SYNC模式,压测结果如下(仅供参考),分别是数据大小为128、1K、4K和16K情况下,随着并发client上升吞吐量的对比图:
系统的资源消耗来看,Mina的load相对偏高。
上篇文章我谈到了
java nio的一个严重BUG,并且介绍了jetty是如何规避这个BUG的。我在将这部分代码整合进yanf4j的过程中发现了不少误判的情况,让我们看看误判是怎么发生的。jetty的解决方案是通过在select返回为0的情况下,计量Selector.select(timeout)执行的时间是否与传入的timeout参数相差太大(小于timeout的一半),如果相差太大,那么认为发生一次bug,如果发生的次数超过设定值,依据严重程度进行处理:第一尝试取消任何有效并且interestOps等于0的SelectionKey;第二次就是重新创建一个Selector,并将有效的Channel注册到新的Selector。误判的发生就产生于这个时间的计量上,看javadoc可以发现它是这样描述这个方法的:
This method performs a blocking
selection operation. It
returns only after at least one channel is selected, this selector's
wakeup
method is invoked, or the current thread is interrupted, whichever comes first
意思就是说这个方法将阻塞select调用,直到下列三种情况之一发生才返回:至少一个channel被选中;同一个Selector的wakeup方法被调用;或者调用所处的当前线程被中断。这三种情况无论谁先发生,都将导致select(timeout)返回。因此为了减少误判,你需要将这三种情况加入判断条件。Jetty的方案已经将select返回为0的情况考虑了,但是却没有考虑线程被中断或者Selector被wakeup的情况,在jetty的运行时也许不会有这两种情况的发生,不过我在windows上用jdk 6u7跑jetty的时候就发现了误判的日志产生。除了wakeup和线程中断这两种情形外,为了进一步提高判断效率,应该将操作系统版本和jdk版本考虑进来,如果是非linux系统直接不进行后续的判断,如果是jdk6u4以后版本也直接忽略判断,因此yanf4j里的实现大致如下:
boolean seeing = false;
/**
* 非linux系统或者超过java6u4版本,直接返回
*/
if (!SystemUtils.isLinuxPlatform()
|| SystemUtils.isAfterJava6u4Version()) {
return seeing;
}
/**
* 判断是否发生BUG的要素:
* (1)select返回为0
* (2)wait时间大于0
* (3)select耗时小于一定值
* (4)非wakeup唤醒
* (5)非线程中断引起
*/
if (JVMBUG_THRESHHOLD > 0 && selected == 0
&& wait > JVMBUG_THRESHHOLD && now - before < wait / 4
&& !this.wakenUp.get() /* waken up */
&& !Thread.currentThread().isInterrupted()/* Interrupted */) {
this.jvmBug.incrementAndGet();
其中判断是否是线程中断引起的是通过Thread.currentThread().isInterrupted(),判断是否是wakeup是通过一个原子变量wakenUp,当调调用Selector.wakeup时候,这个原子变量更新为true。判断操作系统和jdk版本是通过System.getProperty得到系统属性做字符串处理即可。类似的代码示例:
public static final String OS_NAME = System.getProperty("os.name");
private static boolean isLinuxPlatform = false;
static {
if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) {
isLinuxPlatform = true;
}
}
public static final String JAVA_VERSION = System
.getProperty("java.version");
private static boolean isAfterJava6u4Version = false;
static {
if (JAVA_VERSION != null) {
// java4 or java5
if (JAVA_VERSION.indexOf("1.4.") >= 0
|| JAVA_VERSION.indexOf("1.5.") >= 0)
isAfterJava6u4Version = false;
// if it is java6,check sub version
else if (JAVA_VERSION.indexOf("1.6.") >= 0) {
int index = JAVA_VERSION.indexOf("_");
if (index > 0) {
String subVersionStr = JAVA_VERSION.substring(index + 1);
if (subVersionStr != null && subVersionStr.length() > 0) {
try {
int subVersion = Integer.parseInt(subVersionStr);
if (subVersion >= 4)
isAfterJava6u4Version = true;
} catch (NumberFormatException e) {
}
}
}
// after java6
} else
isAfterJava6u4Version = true;
}
}
随着并发数量的提高,传统nio框架采用一个Selector来支撑大量连接事件的管理和触发已经遇到瓶颈,因此现在各种nio框架的新版本都采用多个Selector并存的结构,由多个Selector均衡地去管理大量连接。这里以Mina和Grizzly的实现为例。
在Mina 2.0中,Selector的管理是由org.apache.mina.transport.socket.nio.NioProcessor来处理,每个NioProcessor对象保存一个Selector,负责具体的select、wakeup、channel的注册和取消、读写事件的注册和判断、实际的IO读写操作等等,核心代码如下:
public NioProcessor(Executor executor) {
super(executor);
try {
// Open a new selector
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeIoException("Failed to open a selector.", e);
}
}
protected int select(long timeout) throws Exception {
return selector.select(timeout);
}
protected boolean isInterestedInRead(NioSession session) {
SelectionKey key = session.getSelectionKey();
return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0;
}
protected boolean isInterestedInWrite(NioSession session) {
SelectionKey key = session.getSelectionKey();
return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0;
}
protected int read(NioSession session, IoBuffer buf) throws Exception {
return session.getChannel().read(buf.buf());
}
protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
if (buf.remaining() <= length) {
return session.getChannel().write(buf.buf());
} else {
int oldLimit = buf.limit();
buf.limit(buf.position() + length);
try {
return session.getChannel().write(buf.buf());
} finally {
buf.limit(oldLimit);
}
}
}
这些方法的调用都是通过AbstractPollingIoProcessor来处理,这个类里可以看到一个nio框架的核心逻辑,注册、select、派发,具体因为与本文主题不合,不再展开。NioProcessor的初始化是在NioSocketAcceptor的构造方法中调用的:
public NioSocketAcceptor() {
super(new DefaultSocketSessionConfig(), NioProcessor.class);
((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
直接调用了父类AbstractPollingIoAcceptor的构造函数,在其中我们可以看到,默认是启动了一个SimpleIoProcessorPool来包装NioProcessor:
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
Class<? extends IoProcessor<T>> processorClass) {
this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
true);
}
这里其实是一个组合模式,SimpleIoProcessorPool和NioProcessor都实现了Processor接口,一个是组合形成的Processor池,而另一个是单独的类。调用的SimpleIoProcessorPool的构造函数是这样:
private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
this(processorType, null, DEFAULT_SIZE);
}
可以看到,默认的池大小是cpu个数+1,也就是创建了cpu+1个的Selector对象。它的重载构造函数里是创建了一个数组,启动一个CachedThreadPool来运行NioProcessor,通过反射创建具体的Processor对象,这里就不再列出了。
Mina当有一个新连接建立的时候,就创建一个NioSocketSession,并且传入上面的SimpleIoProcessorPool,当连接初始化的时候将Session加入SimpleIoProcessorPool:
protected NioSession accept(IoProcessor<NioSession> processor,
ServerSocketChannel handle) throws Exception {
SelectionKey key = handle.keyFor(selector);
if ((key == null) || (!key.isValid()) || (!key.isAcceptable()) ) {
return null;
}
// accept the connection from the client
SocketChannel ch = handle.accept();
if (ch == null) {
return null;
}
return new NioSocketSession(this, processor, ch);
}
private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
T session = accept(processor, handle);
if (session == null) {
break;
}
initSession(session, null, null);
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
加入的操作是递增一个整型变量并且模数组大小后对应的NioProcessor注册到session里:
private IoProcessor<T> nextProcessor() {
checkDisposal();
return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];
}
if (p == null) {
p = nextProcessor();
IoProcessor<T> oldp =
(IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p);
if (oldp != null) {
p = oldp;
}
}
这样一来,每个连接都关联一个NioProcessor,也就是关联一个Selector对象,避免了所有连接共用一个Selector负载过高导致server响应变慢的后果。但是注意到NioSocketAcceptor也有一个Selector,这个Selector用来干什么的呢?那就是集中处理OP_ACCEPT事件的Selector,主要用于连接的接入,不跟处理读写事件的Selector混在一起,因此Mina的默认open的Selector是cpu+2个。
看完mina2.0之后,我们来看看Grizzly2.0是怎么处理的,Grizzly还是比较保守,它默认就是启动两个Selector,其中一个专门负责accept,另一个负责连接的IO读写事件的管理。Grizzly 2.0中Selector的管理是通过SelectorRunner类,这个类封装了Selector对象以及核心的分发注册逻辑,你可以将他理解成Mina中的NioProcessor,核心的代码如下:
protected boolean doSelect() {
selectorHandler = transport.getSelectorHandler();
selectionKeyHandler = transport.getSelectionKeyHandler();
strategy = transport.getStrategy();
try {
if (isResume) {
// If resume SelectorRunner - finish postponed keys
isResume = false;
if (keyReadyOps != 0) {
if (!iterateKeyEvents()) return false;
}
if (!iterateKeys()) return false;
}
lastSelectedKeysCount = 0;
selectorHandler.preSelect(this);
readyKeys = selectorHandler.select(this);
if (stateHolder.getState(false) == State.STOPPING) return false;
lastSelectedKeysCount = readyKeys.size();
if (lastSelectedKeysCount != 0) {
iterator = readyKeys.iterator();
if (!iterateKeys()) return false;
}
selectorHandler.postSelect(this);
} catch (ClosedSelectorException e) {
notifyConnectionException(key,
"Selector was unexpectedly closed", e,
Severity.TRANSPORT, Level.SEVERE, Level.FINE);
} catch (Exception e) {
notifyConnectionException(key,
"doSelect exception", e,
Severity.UNKNOWN, Level.SEVERE, Level.FINE);
} catch (Throwable t) {
logger.log(Level.SEVERE,"doSelect exception", t);
transport.notifyException(Severity.FATAL, t);
}
return true;
}
基本上是一个reactor实现的样子,在AbstractNIOTransport类维护了一个SelectorRunner的数组,而Grizzly用于创建tcp server的类TCPNIOTransport正是继承于AbstractNIOTransport类,在它的start方法中调用了startSelectorRunners来创建并启动SelectorRunner数组:
private static final int DEFAULT_SELECTOR_RUNNERS_COUNT = 2;
@Override
public void start() throws IOException {
if (selectorRunnersCount <= 0) {
selectorRunnersCount = DEFAULT_SELECTOR_RUNNERS_COUNT;
}
startSelectorRunners();
}
protected void startSelectorRunners() throws IOException {
selectorRunners = new SelectorRunner[selectorRunnersCount];
synchronized(selectorRunners) {
for (int i = 0; i < selectorRunnersCount; i++) {
SelectorRunner runner =
new SelectorRunner(this, SelectorFactory.instance().create());
runner.start();
selectorRunners[i] = runner;
}
}
}
可见Grizzly并没有采用一个单独的池对象来管理SelectorRunner,而是直接采用数组管理,默认数组大小是2。SelectorRunner实现了Runnable接口,它的start方法调用了一个线程池来运行自身。刚才我提到了说Grizzly的Accept是单独一个Selector来管理的,那么是如何表现的呢?答案在RoundRobinConnectionDistributor类,这个类是用于派发注册事件到相应的SelectorRunner上,它的派发方式是这样:
public Future<RegisterChannelResult> registerChannelAsync(
SelectableChannel channel, int interestOps, Object attachment,
CompletionHandler completionHandler)
throws IOException {
SelectorRunner runner = getSelectorRunner(interestOps);
return transport.getSelectorHandler().registerChannelAsync(
runner, channel, interestOps, attachment, completionHandler);
}
private SelectorRunner getSelectorRunner(int interestOps) {
SelectorRunner[] runners = getTransportSelectorRunners();
int index;
if (interestOps == SelectionKey.OP_ACCEPT || runners.length == 1) {
index = 0;
} else {
index = (counter.incrementAndGet() % (runners.length - 1)) + 1;
}
return runners[index];
}
getSelectorRunner这个方法道出了秘密,如果是OP_ACCEPT,那么都使用数组中的第一个SelectorRunner,如果不是,那么就通过取模运算的结果+1从后面的SelectorRunner中取一个来注册。
分析完mina2.0和grizzly2.0对Selector的管理后我们可以得到几个启示:
1、在处理大量连接的情况下,多个Selector比单个Selector好
2、多个Selector的情况下,处理OP_READ和OP_WRITE的Selector要与处理OP_ACCEPT的Selector分离,也就是说处理接入应该要一个单独的Selector对象来处理,避免IO读写事件影响接入速度。
3、Selector的数目问题,mina默认是cpu+2,而grizzly总共就2个,我更倾向于mina的策略,但是我认为应该对cpu个数做一个判断,如果CPU个数超过8个,那么更多的Selector线程可能带来比较大的线程切换的开销,mina默认的策略并非合适,幸好可以设置这个数值。
这个BUG会在linux上导致cpu 100%,使得nio server/client不可用,具体的详情可以看这里
http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933 。令人失望的是这个BUG直到jdk 6u4才解决,sun的拖沓让人难以相信。这个BUG在server端容易出现,因为server端有频繁地接入断开连接。
使用jdk 6u4之前版本的nio框架都有这个隐患,除非你的框架很好地处理了这个可能的隐患。Grizzly的处理方式比较简单,也就是BUG报告里面提到的方式,在SelectionKey.cancel()之后马上进行了一次select调用将fd从poll(epoll)中移除:
this.selectionKey.cancel();
try {
// cancel key,then select now to remove file descriptor
this.selector.selectNow();
} catch (IOException e) {
onException(e);
log.error("Selector selectNow fail", e);
}
实际上这样的解决方式还是留有隐患的,因为key的取消和这个selectNow操作很可能跟Selector.select操作并发地在进行,在两个操作之间仍然留有一个极小的时间窗口可能发生这个BUG。因此,你需要更安全地方式处理这个问题,jetty的处理方式是这样,连续的select(timeout)操作没有阻塞并返回0,并且次数超过了一个指定阀值,那么就遍历整个key set,将key仍然有效并且interestOps等于0的所有key主动取消掉;如果在这次修正后,仍然继续出现select(timeout)不阻塞并且返回0的情况,那么就重新创建一个新的Selector,并将Old Selector的有效channel和对应的key转移到新的Selector上,
long before=now;
int selected=selector.select(wait);
now = System.currentTimeMillis();
_idleTimeout.setNow(now);
_timeout.setNow(now);
// Look for JVM bugs
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6403933
if (__JVMBUG_THRESHHOLD>0 && selected==0 && wait>__JVMBUG_THRESHHOLD && (now-before)<(wait/2) )
{
_jvmBug++;
if (_jvmBug>=(__JVMBUG_THRESHHOLD2))
{
synchronized (this)
{
_lastJVMBug=now;
// BLOODY SUN BUG !!! Try refreshing the entire selector.
final Selector new_selector = Selector.open();
for (SelectionKey k: selector.keys())
{
if (!k.isValid() || k.interestOps()==0)
continue;
final SelectableChannel channel = k.channel();
final Object attachment = k.attachment();
if (attachment==null)
addChange(channel);
else
addChange(channel,attachment);
}
_selector.close();
_selector=new_selector;
_jvmBug=0;
return;
}
}
else if (_jvmBug==__JVMBUG_THRESHHOLD || _jvmBug==__JVMBUG_THRESHHOLD1)
{
// Cancel keys with 0 interested ops
for (SelectionKey k: selector.keys())
{
if (k.isValid()&&k.interestOps()==0)
{
k.cancel();
}
}
return;
}
}
else
_jvmBug=0;
这个方案能比较好的在jdk 6u4之前的版本上解决这个BUG可能导致的问题。Mina和Netty没有看到有处理这个BUG的代码,如果我看错了,请留言告诉我。Yanf4j一直采用的是grizzly的方式,准备加上jetty的处理方案。当然,最简单的方案就是升级你的JDK :D
泰山在线的周利朋友对xmemcached做了很多测试,他发现了一个比较严重的BUG,在linux平台的重连机制有时候会失效。表现的现象是这样,正常连接上memcached之后,kill掉其中的一台memcched server,xmemcached会开始自动重连这台server直到连接成功,然而事情没有像预想的那样,现象是有时候可以重连成功,有时候却没有,如果设置了connectionPoolSize,有时候建立的连接数达到connectionPoolSize,有时候却没有。他还向我描述了那时候的netstat观察到的网络情况,有比较多CLOSE_WAIT存在,这个显然是由于memcached主动断开,xmemcached被动进入CLOSE_WAIT,但是没有发送FIN的情况,如果有发送FIN那应该进入LAST_ACK而不是停留在CLOSE_WAIT。因此反应的第一个问题是xmemcached没有在接到memcached断开之后主动关闭socket发送FIN。检查代码发现其实是有这个逻辑,但是nio的channel关闭有个隐蔽的问题,就是在SelectionKey.cancel之后还需要调用select才能真正地关闭socket,这里会有个延迟,另外,为了防止CLOSE_WAIT现象的再次发生,设置SO_LINGER选项强制关闭也是必须的。做了这两个修改后,build了一个临时版本请周利朋友帮忙测试,重连失败的情况有所减轻,但是仍然会发生。因此根本的问题不在于CLOSE_WAIT的处理上,通过检查代码发现了下面这段代码:
if(!future.isDone()&&!future.get(DEFAULT_CONNECTION_TIMEOUT,TimeUnit.MILLISECONDS){


}else{
connected=true;
}
可能你已经发现问题在哪。这段代码的意图是通过future.get阻塞等待连接成功或者失败,如果失败做一些处理,如果成功将connected设置为true。这里判断失败有两个条件,future.isDone为false,并且future.get也返回false才认为失败,问题恰恰出在这里,因为future.isDone可能在连接的失败的情况下返回true,而这段逻辑将这种情况误判为连接成功,导致重试的请求被取消。修改很简单,将future.isDone这个条件去掉即可。
回想起来,我也忘了当初为什么加上这个条件,这里感谢下周利的帮助,并且向使用xmemcached的朋友们提个醒。这个问题在win32平台上不会出现(比较诡异,估计跟并发有关),在linux平台出现的几率比较大,预计在10月份发布的1.2.0-stable中修正,这个stable版主要工作是修复BUG。欢迎更多朋友反馈问题和BUG,我将及时修复和反馈。
By starting at the top of the triangle below and moving to adjacent numbers on the row below, the maximum total from top to bottom is 23.
3
7 4
2 4 6
8 5 9 3
That is, 3 + 7 + 4 + 9 = 23.
Find the maximum total from top to bottom of the triangle below:
75
95 64
17 47 82
18 35 87 10
20 04 82 47 65
19 01 23 75 03 34
88 02 77 73 07 63 67
99 65 04 28 06 16 70 92
41 41 26 56 83 40 80 70 33
41 48 72 33 47 32 37 16 94 29
53 71 44 65 25 43 91 52 97 51 14
70 11 33 28 77 73 17 78 39 68 17 57
91 71 52 38 17 14 91 43 58 50 27 29 48
63 66 04 68 89 53 67 30 73 16 69 87 40 31
04 62 98 27 23 09 70 98 73 93 38 53 60 04 23
NOTE: As there are only 16384 routes, it is possible to solve this problem by trying every route. However, Problem 67, is the same challenge with a triangle containing one-hundred rows; it cannot be solved by brute force, and requires a clever method! ;o)
最简单的方法就是穷举,从根节点出发,每个节点都有两个分叉,到达底部的路径有估计有2的指数级的数目(有会算的朋友请留言,我的组合数学都还给老师了),不过这道题显然是符合动态规划的特征,往下递增一层的某个节点的最佳结果f[i][j]肯定是上一层两个入口节点对应的最佳结果的最大值,也就是f[i-1][j]或者f[i-1][j+1],递归的边界就是定点f[0][0]=75。因此我的解答如下,考虑了金字塔边界的情况,数据按照金字塔型存储在numbers.txt中,
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class Euler18Problem {
public static void maxSun(int[][] a, int rows, int cols) {
// 结果列表
int[][] f = new int[15][15];
// 路径,用于输出计算路径
int[][] path = new int[15][15];
// 递归边界
f[0][0] = a[0][0];
path[0][0] = 0;
// 递推
for (int i = 1; i < rows; i++) {
int col = i + 1;
// 决策
for (int j = 0; j < col; j++) {
// 左边界
if (j - 1 < 0) {
f[i][j] = f[i - 1][j] + a[i][j];
path[i][j] = j;
} else if (j + 1 > col) { // 右边界
f[i][j] = f[i - 1][j - 1] + a[i][j];
path[i][j] = j - 1;
} else {
// 处于中间位置
if (f[i - 1][j] <= f[i - 1][j - 1]) {
f[i][j] = f[i - 1][j - 1] + a[i][j];
path[i][j] = j - 1;
} else {
f[i][j] = f[i - 1][j] + a[i][j];
path[i][j] = j;
}
}
}
}
// 求出结果
int result = 0, col = 0;
for (int i = 0; i < cols; i++) {
if (f[14][i] > result) {
result = f[14][i];
col = i;
}
}
// 输出路径
System.out.println("row=14,col=" + col + ",value=" + a[14][col]);
for (int i = rows - 2; i >= 0; i--) {
col = path[i][col];
System.out.println("row=" + i + ",col=" + col + ",value="
+ a[i][col]);
}
System.out.println(result);
}
public static void main(String[] args) throws Exception {
int rows = 15;
int cols = 15;
int[][] a = new int[rows][cols];
BufferedReader reader = new BufferedReader(new InputStreamReader(
Euler18Problem.class.getResourceAsStream("/numbers.txt")));
String line = null;
int row = 0;
while ((line = reader.readLine()) != null && !line.trim().equals("")) {
String[] numbers = line.split(" ");
for (int i = 0; i < numbers.length; i++) {
a[row][i] = Integer.parseInt(numbers[i]);
}
row++;
}
reader.close();
maxSun(a, rows, cols);
}
}
执行结果如下,包括了路径输出:
row=14,col=9,value=93
row=13,col=8,value=73
row=12,col=7,value=43
row=11,col=6,value=17
row=10,col=5,value=43
row=9,col=4,value=47
row=8,col=3,value=56
row=7,col=3,value=28
row=6,col=3,value=73
row=5,col=2,value=23
row=4,col=2,value=82
row=3,col=2,value=87
row=2,col=1,value=47
row=1,col=0,value=95
row=0,col=0,value=75
1074
ps.并非我闲的蛋疼在半夜做题,只是被我儿子折腾的无法睡觉了,崩溃。
本以为在上篇定稿的参数后应该能有比较好的表现,然后实际的表现大出我的意料,cms回收触发非常频繁,虽然每次都只是10-50毫秒,但是次数12个小时内能达到180多次,这显然不正常。通过gc日志和jstat可以看到,每次old区还在5%左右就开始进行CMS,此时的perm区也才30%,这两个数字有浮动并且CMS触发的时间上也没有规律,在测试环境和生产环境中都是如此。
那么最后是怎么解决的呢?其实没有解决。我只是替换了一个参数就没再发生这个现象,上文提到为了避免System.gc()调用引起的full gc,使用了jdk6引入的新参数-XX:+ExplicitGCInvokesConcurrent来让System.gc()并发执行,但是测试表明恰恰是这个参数引起了CMS的频繁发生,去掉这个参数就没有那个奇特的现象。重复检查了代码,并且再次查看了GC日志,没有再发现有System.gc()的调用,我暂时将原因归结于使用了ExplicitGCInvokesConcurrent参数后其他方法触发了CMS,如果有知晓的朋友请留言告知,最后的方案还是彻底禁掉了显式GC调用。最终定稿的参数:
-server -Xms1536m -Xmx1536m -XX:NewSize=256m -XX:MaxNewSize=256m
-XX:PermSize=64m -XX:MaxPermSize=64m -XX:+UseConcMarkSweepGC
-XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70
-XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0
-XX:+CMSClassUnloadingEnabled -XX:+DisableExplicitGC
-XX:SurvivorRatio=8
删除了+CMSPermGenSweepingEnabled,这个参数在jdk6上跟-XX:+CMSClassUnloadingEnabled作用重叠了,如果你还跑在jdk5上面,那么应该使用这个参数。救助空间设置为NewSize的1/10,也就是25M左右,让年轻代尽量回收,防止年轻对象跑到年老代过早触发CMS甚至full gc。CMS的触发阀值下降到70%,因为年老代增长较慢,宁愿回收次数多一点,降低长暂停的可能。
24小时内的某台生产机器的表现,通过jstat观察:
S0 S1 E O P YGC YGCT FGC FGCT GCT
39.70 0.00 5.59 15.15 28.99 20260 326.041 14 0.592 326.633
39.70 0.00 65.49 15.15 28.99 20260 326.041 14 0.592 326.633
0.00 36.93 19.37 15.16 29.01 20261 326.059 14 0.592 326.650
0.00 36.93 93.23 15.16 29.01 20261 326.059 14 0.592 326.650
34.04 0.00 59.62 15.17 29.01 20262 326.076 14 0.592 326.668
0.00 38.55 12.76 15.19 29.01 20263 326.094 14 0.592 326.686
0.00 38.55 65.48 15.19 29.01 20263 326.094 14 0.592 326.686
CMS两次暂停时间总和在100ms以下,minor gc平均一次执行花了16ms,平均3-4秒发生一次。暂时来看还不错,也许还可以适当调小一下NewSize,加快以下minor gc。
此次调整总共花了大概一周多的时间,由于经验不足,还是走了不少弯路,幸好最终的结果还可以,也让自己对cms gc有比较深入的了解。我们的系统在周4晚上已经全部更新上线,从内部测试、压测、日常测试、beta测试以来,每个阶段都发现几个隐蔽的问题,在上线后暂时没有再发现问题,证明这个流程还是很有意义的,我过去对流程充满偏见,现在看来是可笑的。总结我在淘宝5个月越来学习到的东西,几个关键词:认真、负责、细心、快乐。
在初步确定CMS参数后,系统运行了几天,今天尝试在线上打开了GC日志,按阿宝同学的说法是gc日志的开销比之jstat还小,打开之后发现确实影响很小。打开GC日志之后又发现几个隐藏的问题比较有价值,这里记录下。
首先是系统在启动的时候有一次System.gc()调用引起的full gc,日志输出类似这样:
1.201: [Full GC (System) 1.201: [CMS: 0K->797K(1310720K), 0.1090540 secs] 29499K->797K(1546688K), [CMS Perm : 5550K->5547K(65536K)], 0.1091860 secs] [Times: user=0.05 sys=0.06, real=0.11 secs]
可以确认的是我们系统里的代码绝对没有调用System.gc()方法,但是不保证第三方代码有调用,通过搜索代码引用,后来定位到了mina的ByteBuffer创建上面。
Mina 1.1封装的ByteBuffer的allocate()方法默认创建的是Direct ByteBuffer,而DirectByteBuffer的构造函数里调用了
Bits.reserveMemory(cap);
这个方法强制调用了System.gc():
static void reserveMemory(long size) {
synchronized (Bits.class) {
if (!memoryLimitSet && VM.isBooted()) {
maxMemory = VM.maxDirectMemory();
memoryLimitSet = true;
}
if (size <= maxMemory - reservedMemory) {
reservedMemory += size;
return;
}
}
System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException x) {
// Restore interrupt status
Thread.currentThread().interrupt();
}
synchronized (Bits.class) {
if (reservedMemory + size > maxMemory)
throw new OutOfMemoryError("Direct buffer memory");
reservedMemory += size;
}
}
调用这个方法是为了用户对Direct ByteBuffer的内存可控。而在我们系统中使用的通讯层初始化Decoder的时候通过Mina 1.1创建了一个Direct ByteBuffer,导致了这一次强制的full gc。这个Buffer因为是长期持有的,因此创建Direct类型也还可以接受。
但是在这次GC后,又发现每隔一个小时就有一次System.gc()引起的full gc,这就太难以忍受了,日志大概是这样,注意间隔时间都是3600秒左右:
10570.672: [Full GC (System) 10570.672: [CMS: 779199K->107679K(1310720K), 1.2957430 secs] 872163K->107679K(1546688K), [CMS Perm : 23993K->15595K(65536K)], 1.2959630 secs] [Times: user=1.27 sys=0.02, real=1.30 secs]
14171.971: [Full GC (System) 14171.971: [CMS: 680799K->83681K(1310720K), 1.0171580 secs] 836740K->83681K(1546688K), [CMS Perm : 20215K->15599K(65536K)], 1.0173850 secs] [Times: user=0.97 sys=0.01, real=1.02 secs]
17774.020: [Full GC (System) 17774.020: [CMS: 676201K->79331K(1310720K), 0.9652670 secs] 817596K->79331K(1546688K), [CMS Perm : 22808K->15619K(65536K)], 0.9655150 secs] [Times: user=0.93 sys=0.02, real=0.97 secs]
21374.989: [Full GC (System) 21374.989: [CMS: 677818K->78590K(1310720K), 0.9297080 secs] 822317K->78590K(1546688K), [CMS Perm : 16435K->15593K(65536K)], 0.9299620 secs] [Times: user=0.89 sys=0.01, real=0.93 secs]
24976.948: [Full GC (System) 24976.948: [CMS: 659511K->77608K(1310720K), 0.9255360 secs] 794004K->77608K(1546688K), [CMS Perm : 22359K->15594K(65536K)], 0.9257760 secs] [Times: user=0.88 sys=0.02, real=0.93 secs]
28578.892: [Full GC (System) 28578.892: [CMS: 562058K->77572K(1310720K), 0.8365500 secs] 735072K->77572K(1546688K), [CMS Perm : 15840K->15610K(65536K)], 0.8367990 secs] [Times: user=0.82 sys=0.00, real=0.84 secs]
32179.731: [Full GC (System) 32179.732: [CMS: 549874K->77224K(1310720K), 0.7864400 secs] 561803K->77224K(1546688K), [CMS Perm : 16016K->15597K(65536K)], 0.7866540 secs] [Times: user=0.75 sys=0.01, real=0.79 secs]
搜遍了源码和依赖库,没有再发现显式的gc调用,问题只能出在运行时上,突然想起我们的系统使用RMI暴露JMX给监控程序,监控程序通过RMI连接JMX监控系统和告警等,会不会是
RMI的分布式垃圾收集导致的?果然,一查资料,RMI的分布式收集会强制调用System.gc()来进行分布式GC,server端的间隔恰好是一个小时,这个参数可以通过:
-Dsun.rmi.dgc.server.gcInterval=3600000
来调整。调长时间是一个解决办法,但是我们更希望能不出现显式的GC调用,禁止显式GC调用通过
-XX:+DisableExplicitGC是一个办法,但是禁止了分布式GC会导致什么问题却是心理没底,毕竟我们的JMX调用还是很频繁的,幸运的是JDK6还提供了另一个选项-XX:+ExplicitGCInvokesConcurrent,允许System.gc()也并发运行,调整DGC时间间隔加上这个选项双管齐下彻底解决了full gc的隐患。
打开GC日志后发现的另一个问题是remark的时间过长,已经启用了并行remark,但是时间还是经常超过200毫秒,这个可能的原因有两个:我们的年老代太大或者触发CMS的阀值太高了,CMS进行的时候年老代里的对象已经太多。初步的计划是调小-XX:SurvivorRatio增大救助空间并且降低-XX:CMSInitiatingOccupancyFraction这个阀值。此外,还找到另一个可选参数
-XX:+CMSScavengeBeforeRemark,启用这个选项后,强制remark之前开始一次minor gc,减少remark的暂停时间,但是在remark之后也将立即开始又一次相对较长时间minor gc,如果你的minor gc很快的话可以考虑下这个选项,暂未实验。
首先感谢阿宝同学的帮助,我才对这个gc算法的调整有了一定的认识,而不是停留在过去仅仅了解的阶段。在读过sun的文档和跟阿宝讨论之后,做个小小的总结。
CMS,全称Concurrent Low Pause Collector,是jdk1.4后期版本开始引入的新gc算法,在jdk5和jdk6中得到了进一步改进,它的主要适合场景是对响应时间的重要性需求大于对吞吐量的要求,能够承受垃圾回收线程和应用线程共享处理器资源,并且应用中存在比较多的长生命周期的对象的应用。CMS是用于对tenured generation的回收,也就是年老代的回收,目标是尽量减少应用的暂停时间,减少full gc发生的几率,利用和应用程序线程并发的垃圾回收线程来标记清除年老代。在我们的应用中,因为有缓存的存在,并且对于响应时间也有比较高的要求,因此希望能尝试使用CMS来替代默认的server型JVM使用的并行收集器,以便获得更短的垃圾回收的暂停时间,提高程序的响应性。
CMS并非没有暂停,而是用两次短暂停来替代串行标记整理算法的长暂停,它的收集周期是这样:
初始标记(CMS-initial-mark) -> 并发标记(CMS-concurrent-mark) -> 重新标记(CMS-remark) -> 并发清除(CMS-concurrent-sweep) ->并发重设状态等待下次CMS的触发(CMS-concurrent-reset)。
其中的1,3两个步骤需要暂停所有的应用程序线程的。第一次暂停从root对象开始标记存活的对象,这个阶段称为初始标记;第二次暂停是在并发标记之后,暂停所有应用程序线程,重新标记并发标记阶段遗漏的对象(在并发标记阶段结束后对象状态的更新导致)。第一次暂停会比较短,第二次暂停通常会比较长,并且remark这个阶段可以并行标记。
而并发标记、并发清除、并发重设阶段的所谓并发,是指
一个或者多个垃圾回收线程和应用程序线程并发地运行,垃圾回收线程不会暂停应用程序的执行,如果你有多于一个处理器,那么并发收集线程将与应用线程在不同的处理器上运行,显然,这样的开销就是会降低应用的吞吐量。Remark阶段的
并行,是指暂停了所有应用程序后,启动一定数目的垃圾回收进程进行并行标记,此时的应用线程是暂停的。
CMS的young generation的回收采用的仍然是并行复制收集器,这个跟Paralle gc算法是一致的。
下面是参数介绍和遇到的问题总结,
1、启用CMS:
-XX:+UseConcMarkSweepGC。 咳咳,这里犯过一个低级错误,竟然将+号写成了-号
2。CMS默认启动的回收线程数目是 (ParallelGCThreads + 3)/4)
,如果你需要明确设定,可以通过-XX:ParallelCMSThreads=20来设定,其中ParallelGCThreads是年轻代的并行收集线程数
3、CMS是不会整理堆碎片的,因此为了防止堆碎片引起full gc,通过会开启CMS阶段进行合并碎片选项:
-XX:+UseCMSCompactAtFullCollection,开启这个选项一定程度上会影响性能,阿宝的blog里说也许可以通过配置适当的CMSFullGCsBeforeCompaction来调整性能,未实践。
4.为了减少第二次暂停的时间,开启并行remark:
-XX:+CMSParallelRemarkEnabled,如果remark还是过长的话,可以开启
-XX:+CMSScavengeBeforeRemark选项,强制remark之前开始一次minor gc,减少remark的暂停时间,但是在remark之后也将立即开始又一次minor gc。
5.为了避免Perm区满引起的full gc,建议开启CMS回收Perm区选项:
+CMSPermGenSweepingEnabled -XX:+CMSClassUnloadingEnabled
6.默认CMS是在tenured generation沾满68%的时候开始进行CMS收集,如果你的年老代增长不是那么快,并且希望降低CMS次数的话,可以适当调高此值:
-XX:CMSInitiatingOccupancyFraction=80
这里修改成80%沾满的时候才开始CMS回收。
7.年轻代的并行收集线程数默认是
(ncpus <= 8) ? ncpus : 3 + ((ncpus * 5) / 8),如果你希望设定这个线程数,可以通过
-XX:ParallelGCThreads= N 来调整。
8.进入重点,在初步设置了一些参数后,例如:
-server -Xms1536m -Xmx1536m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:PermSize=64m
-XX:MaxPermSize=64m -XX:-UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection
-XX:CMSInitiatingOccupancyFraction=80 -XX:+CMSParallelRemarkEnabled
-XX:SoftRefLRUPolicyMSPerMB=0
需要在生产环境或者压测环境中测量这些参数下系统的表现,这时候需要打开GC日志查看具体的信息,因此加上参数:
-verbose:gc -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Xloggc:/home/test/logs/gc.log
在运行相当长一段时间内查看CMS的表现情况,CMS的日志输出类似这样:
4391.322: [GC [1 CMS-initial-mark: 655374K(1310720K)] 662197K(1546688K), 0.0303050 secs] [Times: user=0.02 sys=0.02, real=0.03 secs]
4391.352: [CMS-concurrent-mark-start]
4391.779: [CMS-concurrent-mark: 0.427/0.427 secs] [Times: user=1.24 sys=0.31, real=0.42 secs]
4391.779: [CMS-concurrent-preclean-start]
4391.821: [CMS-concurrent-preclean: 0.040/0.042 secs] [Times: user=0.13 sys=0.03, real=0.05 secs]
4391.821: [CMS-concurrent-abortable-preclean-start]
4392.511: [CMS-concurrent-abortable-preclean: 0.349/0.690 secs] [Times: user=2.02 sys=0.51, real=0.69 secs]
4392.516: [GC[YG occupancy: 111001 K (235968 K)]4392.516: [Rescan (parallel) , 0.0309960 secs]4392.547: [weak refs processing, 0.0417710 secs] [1 CMS-remark: 655734K(1310720K)] 766736K(1546688K), 0.0932010 secs] [Times: user=0.17 sys=0.00, real=0.09 secs]
4392.609: [CMS-concurrent-sweep-start]
4394.310: [CMS-concurrent-sweep: 1.595/1.701 secs] [Times: user=4.78 sys=1.05, real=1.70 secs]
4394.310: [CMS-concurrent-reset-start]
4394.364: [CMS-concurrent-reset: 0.054/0.054 secs] [Times: user=0.14 sys=0.06, real=0.06 secs]
其中可以看到CMS-initial-mark阶段暂停了0.0303050秒,而CMS-remark阶段暂停了0.0932010秒,因此两次暂停的总共时间是0.123506秒,也就是123毫秒左右。两次短暂停的时间之和在200以下可以称为正常现象。
但是你很可能遇到
两种fail引起full gc:Prommotion failed和Concurrent mode failed。
Prommotion failed的日志输出大概是这样:
[ParNew (promotion failed): 320138K->320138K(353920K), 0.2365970 secs]42576.951: [CMS: 1139969K->1120688K(
2166784K), 9.2214860 secs] 1458785K->1120688K(2520704K), 9.4584090 secs]
这个问题的产生是由于救助空间不够,从而向年老代转移对象,年老代没有足够的空间来容纳这些对象,导致一次full gc的产生。解决这个问题的办法有两种完全相反的倾向:
增大救助空间、增大年老代或者去掉救助空间。增大救助空间就是调整-XX:SurvivorRatio参数,这个参数是Eden区和Survivor区的大小比值,默认是32,也就是说Eden区是Survivor区的32倍大小,要注意Survivo是有两个区的,因此Surivivor其实占整个young genertation的1/34。调小这个参数将增大survivor区,让对象尽量在survitor区呆长一点,减少进入年老代的对象。去掉救助空间的想法是让大部分不能马上回收的数据尽快进入年老代,加快年老代的回收频率,减少年老代暴涨的可能性,这个是通过将-XX:SurvivorRatio 设置成比较大的值(比如65536)来做到。在我们的应用中,将young generation设置成256M,这个值相对来说比较大了,而救助空间设置成默认大小(1/34),从压测情况来看,没有出现prommotion failed的现象,年轻代比较大,从GC日志来看,minor gc的时间也在5-20毫秒内,还可以接受,因此暂不调整。
Concurrent mode failed的产生是由于CMS回收年老代的速度太慢,导致年老代在CMS完成前就被沾满,引起full gc,避免这个现象的产生就是调小
-XX:CMSInitiatingOccupancyFraction参数的值,让CMS更早更频繁的触发,降低年老代被沾满的可能。我们的应用暂时负载比较低,在生产环境上年老代的增长非常缓慢,因此暂时设置此参数为80。在压测环境下,这个参数的表现还可以,没有出现过Concurrent mode failed。
参考资料:
《
JDK5.0垃圾收集优化之--Don't Pause》 by 江南白衣
《记一次Java GC调整经历》
1,
2 by Arbow
Java SE 6 HotSpot[tm] Virtual Machine Garbage Collection Tuning
Tuning Garbage Collection
with the 5.0 JavaTM Virtual Machine
按照《Unix网络编程》的划分,IO模型可以分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO,按照POSIX标准来划分只分为两类:同步IO和异步IO。如何区分呢?首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。
Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:
java.nio.channels.AsynchronousChannel
标记一个channel支持异步IO操作。
java.nio.channels.AsynchronousServerSocketChannel
ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。
java.nio.channels.AsynchronousSocketChannel
面向流的异步socket channel,表示一个连接。
java.nio.channels.AsynchronousChannelGroup
异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandler。AsynchronousServerSocketChannel创建的时候可以传入一个AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的AsynchronousSocketChannel将
同属于一个组,共享资源。
java.nio.channels.CompletionHandler
异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future模式或者注册CompletionHandler,我更推荐用CompletionHandler的方式,这些handler的调用是由AsynchronousChannelGroup的线程池派发的。显然,
线程池的大小是性能的关键因素。AsynchronousChannelGroup允许绑定不同的线程池,通过三个静态方法来创建:
public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,
ThreadFactory threadFactory)
throws IOException
public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,
int initialSize)
public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)
throws IOException
需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。
在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor模式,Reacot负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责CompletionHandler的派发,查看一个典型的IO写操作的流程来看两者的区别:
Reactor: send(msg) -> 消息队列是否为空,如果为空 -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。
Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。
CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:
public interface CompletionHandler<V,A> {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
void cancelled(A attachment);
}
其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。
在初步介绍完aio引入的类和接口后,我们看看一个典型的tcp服务端是怎么启动的,怎么接受连接并处理读和写,这里引用的代码都是yanf4j 的aio分支中的代码,可以从svn checkout,svn地址:
http://yanf4j.googlecode.com/svn/branches/yanf4j-aio
第一步,创建一个AsynchronousServerSocketChannel,创建之前先创建一个AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以绑定一个AsynchronousChannelGroup,那么通过这个AsynchronousServerSocketChannel建立的连接都将同属于一个AsynchronousChannelGroup并共享资源:
this.asynchronousChannelGroup = AsynchronousChannelGroup
.withCachedThreadPool(Executors.newCachedThreadPool(),
this.threadPoolSize);
然后初始化一个AsynchronousServerSocketChannel,通过open方法:
this.serverSocketChannel = AsynchronousServerSocketChannel
.open(this.asynchronousChannelGroup);
通过nio 2.0引入的SocketOption类设置一些TCP选项:
this.serverSocketChannel
.setOption(
StandardSocketOption.SO_REUSEADDR,true);
this.serverSocketChannel
.setOption(
StandardSocketOption.SO_RCVBUF,16*1024);
绑定本地地址:
this.serverSocketChannel
.bind(new InetSocketAddress("localhost",8080), 100);
其中的100用于指定等待连接的队列大小(backlog)。完了吗?还没有,最重要的
监听工作还没开始,监听端口是为了等待连接上来以便accept产生一个AsynchronousSocketChannel来表示一个新建立的连接,因此需要发起一个accept调用,调用是异步的,操作系统将在连接建立后,将最后的结果——AsynchronousSocketChannel返回给你:
public void pendingAccept() {
if (this.started && this.serverSocketChannel.isOpen()) {
this.acceptFuture = this.serverSocketChannel.accept(null,
new AcceptCompletionHandler());
} else {
throw new IllegalStateException("Controller has been closed");
}
}
注意,重复的accept调用将会抛出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一个参数是你想传给CompletionHandler的attchment,第二个参数就是注册的用于回调的CompletionHandler,最后返回结果Future<AsynchronousSocketChannel>。你可以对future做处理,这里采用更推荐的方式就是注册一个CompletionHandler。那么accept的CompletionHandler中做些什么工作呢?显然一个赤裸裸的AsynchronousSocketChannel是不够的,我们需要将它封装成session,一个session表示一个连接(mina里就叫IoSession了),里面带了一个缓冲的消息队列以及一些其他资源等。在连接建立后,除非你的服务器只准备接受一个连接,不然你需要在后面
继续调用pendingAccept来发起另一个accept请求:
private final class AcceptCompletionHandler implements
CompletionHandler<AsynchronousSocketChannel, Object> {
@Override
public void cancelled(Object attachment) {
logger.warn("Accept operation was canceled");
}
@Override
public void completed(AsynchronousSocketChannel socketChannel,
Object attachment) {
try {
logger.debug("Accept connection from "
+ socketChannel.getRemoteAddress());
configureChannel(socketChannel);
AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);
Session session = new AioTCPSession(sessionConfig,
AioTCPController.this.configuration
.getSessionReadBufferSize(),
AioTCPController.this.sessionTimeout);
session.start();
registerSession(session);
} catch (Exception e) {
e.printStackTrace();
logger.error("Accept error", e);
notifyException(e);
} finally {
pendingAccept();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
logger.error("Accept error", exc);
try {
notifyException(exc);
} finally {
pendingAccept();
}
}
}
注意到了吧,我们在failed和
completed方法中在最后都调用了pendingAccept来继续发起accept调用,等待新的连接上来。有的同学可能要说了,这样搞是不是递归调用,会不会堆栈溢出?实际上不会,因为发起accept调用的线程与CompletionHandler回调的线程并非同一个,不是一个上下文中,两者之间没有耦合关系。要注意到,CompletionHandler的回调共用的是AsynchronousChannelGroup绑定的线程池,因此
千万别在回调方法中调用阻塞或者长时间的操作,例如sleep,回调方法最好能支持超时,防止线程池耗尽。
连接建立后,怎么读和写呢?回忆下在nonblocking nio框架中,连接建立后的第一件事是干什么?注册OP_READ事件等待socket可读。异步IO也同样如此,连接建立后马上发起一个异步read调用,等待socket可读,这个是Session.start方法中所做的事情:
public class AioTCPSession
{
protected void start0() {
pendingRead();
}
protected final void pendingRead() {
if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
if (!this.readBuffer.hasRemaining()) {
this.readBuffer = ByteBufferUtils
.increaseBufferCapatity(this.readBuffer);
}
this.readFuture = this.asynchronousSocketChannel.read(
this.readBuffer, this, this.readCompletionHandler);
} else {
throw new IllegalStateException(
"Session Or Channel has been closed");
}
}


}
AsynchronousSocketChannel的read调用与AsynchronousServerSocketChannel的accept调用类似,同样是非阻塞的,返回结果也是一个Future,但是写的结果是整数,表示写入了多少字节,因此read调用返回的是Future<Integer>,方法的第一个参数是读的缓冲区,操作系统将IO读到数据拷贝到这个缓冲区,第二个参数是传递给CompletionHandler的attchment,第三个参数就是注册的用于回调的CompletionHandler。这里保存了read的结果Future,这是为了在关闭连接的时候能够主动取消调用,accept也是如此。现在可以看看read的CompletionHandler的实现:
public final class ReadCompletionHandler implements
CompletionHandler<Integer, AbstractAioSession> {
private static final Logger log = LoggerFactory
.getLogger(ReadCompletionHandler.class);
protected final AioTCPController controller;
public ReadCompletionHandler(AioTCPController controller) {
this.controller = controller;
}
@Override
public void cancelled(AbstractAioSession session) {
log.warn("Session(" + session.getRemoteSocketAddress()
+ ") read operation was canceled");
}
@Override
public void completed(Integer result, AbstractAioSession session) {
if (log.isDebugEnabled())
log.debug("Session(" + session.getRemoteSocketAddress()
+ ") read +" + result + " bytes");
if (result < 0) {
session.close();
return;
}
try {
if (result > 0) {
session.updateTimeStamp();
session.getReadBuffer().flip();
session.decode();
session.getReadBuffer().compact();
}
} finally {
try {
session.pendingRead();
} catch (IOException e) {
session.onException(e);
session.close();
}
}
controller.checkSessionTimeout();
}
@Override
public void failed(Throwable exc, AbstractAioSession session) {
log.error("Session read error", exc);
session.onException(exc);
session.close();
}
}
如果IO读失败,会返回失败产生的异常,这种情况下我们就主动关闭连接,通过session.close()方法,这个方法干了两件事情:关闭channel和取消read调用:
if (null != this.readFuture) {
this.readFuture.cancel(true);
}
this.asynchronousSocketChannel.close();
在读成功的情况下,我们还需要判断结果result是否小于0,
如果小于0就表示对端关闭了,这种情况下我们也主动关闭连接并返回。如果读到一定字节,也就是result大于0的情况下,我们就尝试从读缓冲区中decode出消息,并派发给业务处理器的回调方法,最终
通过pendingRead继续发起read调用等待socket的下一次可读。可见,我们并不需要自己去调用channel来进行IO读,而是操作系统帮你直接读到了缓冲区,然后给你一个结果表示读入了多少字节,你处理这个结果即可。而nonblocking IO框架中,是reactor通知用户线程socket可读了,然后用户线程自己去调用read进行实际读操作。
这里还有个需要注意的地方,就是decode出来的消息的派发给业务处理器工作最好交给一个线程池来处理,避免阻塞group绑定的线程池。
IO写的操作与此类似,不过通常写的话我们会在session中关联一个缓冲队列来处理,没有完全写入或者等待写入的消息都存放在队列中,队列为空的情况下发起write调用:
protected void write0(WriteMessage message) {
boolean needWrite = false;
synchronized (this.writeQueue) {
needWrite = this.writeQueue.isEmpty();
this.writeQueue.offer(message);
}
if (needWrite) {
pendingWrite(message);
}
}
protected final void pendingWrite(WriteMessage message) {
message = preprocessWriteMessage(message);
if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {
this.asynchronousSocketChannel.write(message.getWriteBuffer(),
this, this.writeCompletionHandler);
} else {
throw new IllegalStateException(
"Session Or Channel has been closed");
}
}
write调用返回的结果与read一样是一个Future<Integer>,而write的CompletionHandler处理的核心逻辑大概是这样:
@Override
public void completed(Integer result, AbstractAioSession session) {
if (log.isDebugEnabled())
log.debug("Session(" + session.getRemoteSocketAddress()
+ ") writen " + result + " bytes");
WriteMessage writeMessage;
Queue<WriteMessage> writeQueue = session.getWriteQueue();
synchronized (writeQueue) {
writeMessage = writeQueue.peek();
if (writeMessage.getWriteBuffer() == null
|| !writeMessage.getWriteBuffer().hasRemaining()) {
writeQueue.remove();
if (writeMessage.getWriteFuture() != null) {
writeMessage.getWriteFuture().setResult(Boolean.TRUE);
}
try {
session.getHandler().onMessageSent(session,
writeMessage.getMessage());
} catch (Exception e) {
session.onException(e);
}
writeMessage = writeQueue.peek();
}
}
if (writeMessage != null) {
try {
session.pendingWrite(writeMessage);
} catch (IOException e) {
session.onException(e);
session.close();
}
}
}
compete方法中的result就是实际写入的字节数,然后我们判断消息的缓冲区是否还有剩余,如果没有就将消息从队列中移除,如果队列中还有消息,那么继续发起write调用。
重复一下,这里引用的代码都是yanf4j aio分支中的源码,感兴趣的朋友可以直接check out出来看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。
在引入了aio之后,java对于网络层的支持已经非常完善,该有的都有了,java也已经成为服务器开发的首选语言之一。java的弱项在于对内存的管理上,由于这一切都交给了GC,因此在高性能的网络服务器上还是Cpp的天下。java这种单一堆模型比之erlang的进程内堆模型还是有差距,很难做到高效的垃圾回收和细粒度的内存管理。
这里仅仅是介绍了aio开发的核心流程,对于一个网络框架来说,还需要考虑超时的处理、缓冲buffer的处理、业务层和网络层的切分、可扩展性、性能的可调性以及一定的通用性要求。
MQ在分布式系统中扮演着重要角色,异步的消息通信全要靠它,而异步通信正是提高系统伸缩性的不二良方。说说我认为的一个优秀的MQ产品需要具备的特征。
首先显然是高可用性,我们当然希望MQ能支撑7x24小时应用,而不是三天两头当机,我们要追求的是99.9%的可靠服务时间。要做到高可用性,显然我们需要做MQ的集群,一台当了,不影响整个集群的服务能力,这里涉及到告警、流控、消息的负载均衡、数据库的使用、测试的完备程度等等。
其次是消息存储的高可靠性。我们要保证100%不丢消息。要做到消息存储的高可靠性,不仅仅是MQ的责任,更涉及到硬件、操作系统、语言平台和数据库的一整套方案。许多号称可靠存储的MQ产品其实都不可靠,要知道,硬件错误是常态,如果在硬件错误的情况下还能保证消息的可靠存储这才是难题。这里可能需要用到特殊的存储硬件,特殊的数据库,分布式的数据存储,数据库的分库分表和同步等等。你要考虑消息存储在哪里,是文件系统,还是数据库,是本地文件,还是分布式文件,是搞主辅备份呢还是多主机写入等等。
第三是高可扩展性,MQ集群能很好地支持水平扩展,这就要求我们的节点之间最好不要有通信和数据同步。
第四是性能,性能是实现高可用性的前提,很难想象单机性能极差的MQ组成的集群能在高负载下幸免于难。性能因素跟采用的平台、语言、操作系统、代码质量、数据库、网络息息相关。MQ产品的核心其实是消息的存储,在保证存储安全的前提下如何保证和提高消息入队的效率是性能的关键因素。这里需要开发人员建立起性能观念,不需要你对一行行代码斤斤计较,但是你需要知道这样做会造成什么后果,有没有更好更快的方式,你怎么证明它更好更快。软件实现的性能是一方面,另一方面就是平台相关了,因为MQ本质上是IO密集型的系统,瓶颈在IO,如何优化网络IO、文件IO这需要专门的知识。性能另一个相关因素是消息的调度上,引入消息顺序和消息优先级,允许消息的延迟发送,都将增大消息发送调度的复杂性,如何保证高负载下的调度也是要特别注意的地方。
第五,高可配置性和监控工具的完整,这是一个MQ产品容易忽略的地方。异步通信造成了查找问题的难度,不像同步调用那样有相对明确的时序关系。因此查找异步通信的异常是很困难的,这就需要MQ提供方便的DEBUG工具,查找分析日志的工具,查看消息生命周期的工具,查看系统间依赖关系的工具等等。可定制也是MQ产品非常重要的一方面,可方便地配置各类参数并在集群中同步,并且可动态调整各类参数,这将大大降低维护难度。
一些不成熟的想法,瞎侃。
XMemcached 1.2.0-RC2 released,main highlights:
1、支持
Kestrel。Kestrel是一个scala编写的简单高效的MQ,它是Twitter发布的开源产品,支持memcached协议,但并不完全兼容。更多信息看
这里。Xmemcached提供了一个KestrelCommandFactory,用于对kestrel特性的支持。
2、新增了基于
Election Hash的SessionLocator。Election Hash的详细解释看
这里。简单来说就是每次查找key对应的节点的时候,都计算节点ip+key的MD5值,然后进行排序,取最大者为目标节点。这个算法解决的问题与Consistent Hash类似,但是因为每次都要计算,因此开销会比较大,适合节点数比较少的情况,避免了consistent hash为了节点比较均匀需要引入虚拟节点的问题。测试表明,Election Hash的结果也是比较均匀的,并且在节点增删的情况下能保持与一致性哈希相近的命中率。要使用election hash,请使用
ElectionMemcachedSessionLocator。
3、从RC1版本以来的Bug fixed.
欢迎试用并反馈任何意见和BUG。
Kestrel是一个
scala写的twitter开源的消息中间件,特点是高性能、小巧(2K行代码)、持久存储(记录日志到journal)并且可靠(支持可靠获取)。Kestrel的前身是Ruby写的
Starling项目,后来twitter的开发人员尝试用scala重新实现。它的代码非常简洁并且优雅,推荐一读。
Kestrel采用的协议是
memcached的文本协议,但是并不完全支持所有
memcached协议,也不是完全兼容现有协议。标准的协议它仅支持GET、SET、FLUSH_ALL、STATS,扩展的协议有:
SHUTDOWN 关闭kestrel server
RELOAD 动态重新加载配置文件
DUMP_CONFIG dump配置文件
FLUSH queueName flush某个队列
每个key对应都是一个队列。标准memcached文本协议的支持上也没有完全兼容,
SET不支持flag,因此现有大多数基于flag做序列化的memcached client都无法存储任意java类型到kestrel;FLUSH_ALL返回"Flushed all queues.\r\n"而不是"OK\r\n"。
GET协议支持
阻塞获取和可靠获取,都是在key上作文章,例如你要获取queue1的消息,并且在没有消息的时候等待一秒钟,如果有消息马上返回,超时时间后还没有就返回空,kestrel允许你通过发送
"GET queue1/t=1000\r\n"
来阻塞获取。本来的key应该queue1,这里变成了"queue1/t=1000",因此如果你使用的client有对返回的key和发送的key做校验,那么可能就认为kestrel返回错误。
什么是可靠获取呢?默认的GET是从队列中获取消息后,server端就将该消息从队列中移除,客户端需要自己保证不把这个消息丢失掉,也就是说这里是类似JMS规范中的自动应答(auto-acknowledge),如果客户端在处理这个消息的时候异常崩溃或者在接收消息数据的时候连接断开,那么可能导致这个消息永久丢失。Kestrel的可靠获取就是类似JMS规范中的CLIENT_ACK mode,客户端获取消息后,server将这个消息从队列移除并正常发送给客户端,如果这时候客户端崩溃或者连接断开,那么server将不会确认该消息被消费并且"un-get"这个消息,重新放到队列头部,那么当client重新连接上来的时候还可以获取这个消息;只有当server收到客户端的明确确认消息成功的时候,才将消息移除。这个功能也是通过key做手脚,
"GET queue1/open\r\n" 开始一次可靠获取
"GET queue1/close\r\n" 确认消费成功
你要关闭前一次可靠获取开启新的一次,还可以这样调用
"GET queue1/close/open\r\n"
要注意的是每个连接的client
只能有一个正在执行的可靠获取,关闭一个没有开启的reliable fetch或者在执行一次reliable fetch再次open一个新的获取都将直接返回空。
从kestrel的协议方面,我们可以学习到的一点就是
在做一份协议的时候,如果有多种不同语言的client的话,应该尽量用通用协议,通用协议通常都已经有很多成熟的client可以使用,避免了为私有协议开发不同语言的client;并且我们可以在通用协议上作扩展,例如kestrel在key上面做的花样,通过给key附加不同的属性即可实现一些特殊功能。
XMemcachedClient默认是无法支持kestrel对memcached的协议的扩展,也就是说无法支持阻塞获取、可靠获取和flush_all,这是因为
xmemcached会对返回的key和发送的key做校验,如果不相等就认为解码错误;并且由于kestrel不支持flag,因此无法存储java序列化类型;另外一个问题是,xmemcached(spymemcached)都会将连续的GET协议合并成一个bulk get协议,而kestrel也并不支持bulk get,所以需要关闭这个优化,这个可以通过下列代码关闭:
memcachedClient.setOptimizeGet(false);
Spymemcached似乎不提供这个选项。为了解决序列化问题,我添加了一个新的
KestrelCommandFactory,使用这个CommandFactory后,将默认关闭get优化,并且不对GET返回的key做校验从而支持阻塞获取和可靠获取,并且将在存储的数据之前加上4个字节的flag(整型),因此可以支持存储任意可序列化类型。但是有一些应用只需要存储字符串类型和原生类型,这是为了在不同语言的client之间保持可移植(如存储json数据),那么就不希望在数据之前加上这个flag,关闭这个功能可以通过
memcachedClient.setPrimitiveAsString(true);
方法来设置,所有的原生类型都将调用toString转成字符串来存储,字符串前不再自动附加flag。
KestrelCommandFactory已经提交到
svn trunk,预计在xmemcached 1.2.0-RC2的时候发布。
使用KestrelCommandFactory对kestrel做的性能测试,server和client都跑在linux上,jdk6,单线程单client连续push消息
消息个数 消息长度 是否启用journal 时间 TPS(/s)
500000 256 否 123.0s 4065
500000 1024 否 126.3s 3959
500000 4096 否 120.6s 4145
500000 4096 是 122.1s 4095
500000 8192 是 121.2s 4125
从数据上来看比官方数据好很多,可能机器配置不同。是否启用journal带来的影响似乎很小,写文件都是append,还是比较高效的。
kestrel的项目主页
http://github.com/robey/kestrel
kestrel的wiki页
http://wiki.github.com/robey/kestrel
xmemcached项目主页
http://code.google.com/p/xmemcached/
XMemcached是一个基于java nio的
Memcached Client,正式发布1.2.0-RC1版本。此版本又是一个里程碑版本,开始支持memcached的二进制协议,并添加了几个更有价值的功能。此版本的主要改进如下:
1、
支持完整的memcached binary协议。XMemcached现在已经支持memcached的所有文本协议和二进制协议,成为一个比较完整的java client。Memcached的二进制协议带来更好的性能以及更好的可扩展性。在XMemcached中使用二进制协议,你只要添加一行代码:
XMemcachedClientBuilder builder=
.
builder.setCommandFactory(new BinaryCommandFactory());//此行
或者在Spring配置中增加一行配置:
<bean name="memcachedClient2"
class="net.rubyeye.xmemcached.utils.XMemcachedClientFactoryBean" destroy-method="shutdown">


<!--采用binary command -->
<property name="commandFactory">
<bean class="net.rubyeye.xmemcached.command.BinaryCommandFactory"></bean>
</bean>
2.
支持与hibernate-memcached的集成。
Hibernate-memcached是可以将memcached作为hibernate二级缓存的开源项目,它默认采用的是Spymemcached,XMemcached 1.2.0开始提供对它的集成,具体的配置信息参考
这里。
3.
兼容JDK5。XMemcached的1.x版本都仅能在jdk6上使用,从1.2.0-RC1开始,XMemcached开始兼容jdk5。当时考虑只支持jdk6是由于nio的Epoll Selector实现是在jdk6上成为默认,而jdk5需要设置环境变量。不过XMemcached 1.2.0-RC1将自动帮你判断是否是linux平台,并且判断是否可以启用epoll,如果可以,那么将在linux平台采用EPollSelectorProvider,这一切对用户来说是透明的。(注意,jdk5的低版本在linux平台仍然是没有epoll实现的)。
4.
日志从common-logging迁移到slf4j。XMemcached现在必须的两个依赖包分别是
slf4j和
yanf4j(1.0-SNAPSHOT).
5.另一个关键性的改进是
允许设置连接池。众所周知,nio的client默认一般都是一个连接,传统的阻塞io采用连接池的方式提高效率。但是在典型的高并发场景下,nio的单连接也将遇到瓶颈,此时允许设置连接池将是一个可选的调优手段。XMemcached 1.2.0-RC1支持设置连接池,允许对同一个memcached节点建立多个连接,启用的代码如下:
MemcachedClient mc =
.
mc.setConnectionPoolSize(2);
默认的pool size是1。设置这一数值不一定能提高性能,请依据你的项目的测试结果为准。初步的测试表明只有在大并发下才有提升。设置连接池的一个不良后果就是,同一个memcached的连接之间的数据更新并非同步的,因此你的应用需要自己保证数据更新的原子性(采用CAS或者数据之间毫无关联)。
6、
简化构建,移除ant构建,简化maven构建,现在只采用maven构建了。借助于
wagon-svn这个扩展,可以将svn作为maven仓库,因此xmemcached的构建现在变的非常方便,下载源码后敲入mvn package即可。
7.
升级yanf4j到1.0-SNAPSHOT版本,此版本引入了SocketOption类,方便设置socket选项,并为引入aio做了重构。
8、从1.1.3和1.2.0-beta以来的bug fixed.
欢迎使用和建议。
下载地址:
http://code.google.com/p/xmemcached/downloads/list
XMemcached的结构方面的文档比较少,可能对有兴趣了解它的基本结构,或者想读源码的朋友入手比较困难。画了两张UML图,一张是主要的类图,描述了主要的类和接口之间的关系和结构。一张是序列图,一次典型的get操作需要经过什么步骤。
首先看类图,没有什么需要特别说明的。
再看一下get操作的序列图,需要注意的是等待响应的过程是异步的。
推迟了半个月之后,发布
xmemcached-1.2.0的beta测试版本,此版本又是一个里程碑版本,主要亮点如下:
1、支持全部的二进制协议,包括noreply的二进制协议。memcached 1.4.0正式推出memcached的二进制协议,相比于文本协议,二进制协议更复杂,但是也更容易解析和编码,并且可扩展性也比较强,比如原来文本协议只允许key为String类型,二进制协议允许key是任意类型,并且长度可以达到2^16-1,大大超过原有的255的限制。另一方面,文本协议的可读性更好,在不同上平台上实现也比较容易,而二进制协议就可能需要考虑可移植性的问题。
xmemcached支持全部二进制协议后才算是一个比较完整的memcached的java客户端了。在实现上可能还有一些隐藏的BUG和问题,欢迎试用并反馈,注意,如果使用二进制协议,你的memcached版本是必须是最新的1.4.0。
如果要使用二进制协议,你只需要添加一行代码:
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(servers));
//添加下面这行,采用BinaryCommandFactory即可使用二进制协议
builder.setCommandFactory(new BinaryCommandFactory());
MemcachedClient mc = builder.build();
2、支持
hibernate-memcached,在某用户的要求下添加了此特性。
hibernate-memcached允许你使用memcached作为hibernate的二级缓存,但是它默认使用的是Spymemcached,想替换成Xmemcached就需要做一些扩展,在1.2.0提供了这一支持。你需要做的是将memcacheClientFactory属性设置为Xmemcached的即可:
hibernate.memcached.memcacheClientFactory=net.rubyeye.xmemcached.utils.hibernate.XmemcachedClientFactory
更多设置参考
wiki page.
3、1.1.3以来的一些bug fixed.
项目主页:
http://code.google.com/p/xmemcached/
下载地址:
http://code.google.com/p/xmemcached/downloads/list
在这里要推荐下《观止-微软创建NT和未来的夺命狂奔》,非常精彩,讲述了windows NT开发过程中的人和事。这不仅仅是故事书,也可以看做一本项目管理方面的指南,可以看看这么巨大的项目(几百万行代码)所遭遇到的难题和痛楚。我更愿意将这本书当做《人月神话》的故事版,同样是创建划时代的OS,同样是管理众多人参与的大型的项目,也同样遭遇了种种困扰和痛苦,从这个角度也可以看出,人类的痛苦的相通的:)
单纯从软件构建的角度去看这本书,可以说说我看到的东西,这些是我今天早上走在上班路上的时候想的,咳咳。
1、开发OS是烧钱的事情,NT开发接近5年,每年的花费据说在5000万美刀,那可是在90年代初期,换算成现在更是天文数字。从另一个侧面也说明了linux系统的伟大。开发一个这么烧钱的玩意,如果没有管理层的强力支持,那么不是被砍掉,就是遭遇流产的命运,幸运的是NT团队得到了盖茨的鼎力支持,大概也只有他能这么烧钱了。Dave Culter从DEC辞职的原因也是因为管理层砍掉了他的团队。盖茨另一个做法是不干涉NT团队的开发工作,他只提出目标和期望,然后就偶尔过来看看,不对不知道的东西指手画脚,这点可不容易。
2、每日构建非常重要,NT团队的构建实验室一开始是每周构建,后来做到了每日构建。只有每日构建,持续集成,才能帮你掌控产品质量,及时发现潜在的问题。我们现在的项目使用了hudson,比CC容易配置一点,效果还不错。
3、测试极其重要,专业的测试团队对于大型项目来说尤其重要。除了测试人员之外,开发人员需要做自测,需要对自己check-in的代码负责,如果你签入的代码导致构建失败,那么Dave culter可能冲破墙壁进来,拍着桌子冲你咆哮。对check in必须做严格控制和跟踪,如果在项目的最后冲击阶段,除了showstopper级别的修正代码允许签入之外,其他的修改都不被接受。开发者和测试人员很容易存在对立,检讨自己,我对测试人员也存在偏见和某种程度上的轻视和厌烦,如果从就事论事和都是为一个目标努力的角度来说,测试和开发并不对立,两者是相辅相成,甚至于测试人员更为至关重要。
4、在一个长期而复杂的项目中,如何保持团队成员的士气也是个难事儿。软件开发归根到底是的因素是人,而非工具或者其他,关注人,其实就是在关注你的软件。鼓励士气的常见做法就是设定里程碑,在这个里程碑上发布一个重要版本,让大家看到希望,但是对于OS这样的巨型项目来说,里程碑不是那么容易设定,这从书中项目的不断延期可以看到。另外就是宽松的工作环境和假期,微软的工作环境有目共睹,能做到每个员工独立一个办公室的国内企业还没有吧。国外的开发者似乎很会玩,赛车、滑雪、空手道,其实不是我们不会玩,是我们玩不起,国内的待遇和生活压力让你想玩也玩不起。
可是就算是再好的物质待遇,其实也换不来美好生活,书中充斥着开发者对家庭和婚姻的困惑和痛苦,为了NT,他们也失去了很多,对工作过度投入的后果就是失去平衡的家庭生活,再次验证上帝是公平的,有得必有失,就看你看重的是什么。
5、开发者的效率差异是惊人的,在《人月神话》里已经说明了这一点,开发者之间的效率差异可以达到惊人的10倍,在NT这样的团队里也再次验证了这一结论。
6、投入越多的人力,并不能带来效率的提升,当NTFS文件系统的进度拖慢的时候,微软的经理们考虑添加人手,但是经过慎重的考虑还是没有加人,因为文件系统是技术活,新人很难马上投入开发,而需要老手的带领和培训,引入了更多的沟通成本和培训成本。
7、优秀的代码无法通过行数来衡量,软件某种程度上还真是魔法的产物。
8、NT的一个教训是,应该及早设定你的性能目标,并在适当时候开始关注并优化系统。NT团队后期的很大部分工作都是在优化系统性能,并缩小尺寸。
9、设定Deadline常常是不靠谱的事情,对软件开发的时间估计也常常是不靠谱的事情,这一点从NT的一次又一次的延期可以看出。延期失望的不仅仅是客户,也会打击你的团队成员,遥遥无期的开发过程容易让人崩溃。
10、NT的开发贯穿了对市场的需求的考虑,有个牛X的产品经理还是相当重要的。当然,没有开发者喜欢添加新功能,特别是在已经完成一个新功能的情况下,以至发展到NT的开发者看到产品经理就不由得拿起球棒击墙的地步:)
这本书花了我两个晚上看完,还是看故事有趣呀,上面所说只是我的印象,书中还有许多八卦故事老少咸宜,如果有出入,请看原著:) 有空还得重读下。
1、孩子刚出生一两个月内是睡觉的时间多,清醒的时间少,这是很正常的现象,不用担心。小毅一开始也是这样狂睡,我和他妈妈还有点担心,后来通过查看一些资料才知道这是正常现象。睡的多,长的快。现在7个月了,睡的少了,越来越会闹腾。
2、孩子出生后24小后出现新生儿黄疸,这也是正常现象,也就是脸色比较发黄,医生会做检测,含量在合理范围内就没有问题,两三天内就会推下去,如果超出正常范围,需要住院治疗。小毅那时候检测出来的含量偏高,本来担心还要住院,后来还是退下去了。
3、孩子出生后,应该尽快让他尝试吃奶,母乳比奶粉好多了。
4、孩子在6个月内,腿脚看起来都是弯的,也就是看起来好像会罗圈腿,其实这也是在正常现象,只要注意不要让孩子过早的站立增加腿的负担之外,在6个月之后会慢慢转为正常。不过小毅太喜欢站了,还是有点担心了,准备观察观察,现在他都能自己借助外物从坐到站。孩子补钙还是需要的,推荐冲剂类的补钙产品,小毅用的是爱尔钙,比较酸,还能接受。补钙不是吃东西就完事儿,每天晒一些太阳还是必要的。
5、孩子感冒,不要太紧张,如果是没有超过38度的低烧,先用洗热水澡的方式来降温,洗完热水澡通常会降下来,如果再次上升,采取同样手段,注意保暖。但是也不是衣服穿的越多越好。如果温度高于38.5,强烈建议上医院找专业大夫。但是不要盲目给孩子开药,低烧完全可以通过物理降温,小毅到现在就发烧过一次,他妈妈和爷爷太着急,给他开了药,我知道后“愤怒”地批评了他妈妈,38度以下的低烧物理降温就可以了,让他们停了药,按照我说的去做,没什么问题。当然,不仅仅要靠物理降温,也要观察孩子的精神好不好,精神比较打蔫,跟平常不一样,那还是上医院保险。
6、注意给孩子体检,6个月至少去4次,看看体重和身高是否在正常范围内。小毅同学的体重和身高都还不错,妈妈功劳巨大。
7、孩子的防疫计划是要按照通知去打的,千万不能忘。
8、孩子咳嗽不能小瞧,可能是呼吸道感染或者轻微感冒,如果不是很厉害,可以考虑食疗。风寒喝姜汤,风热榨梨汁给他喝。
9、孩子从5个月开始要吃蛋黄了,这是因为母乳和奶粉提供的铁元素不够了,蛋黄从1/8到一个,逐步添加。在6,7个月的时候,可以考虑将鸡蛋做成鸡蛋羹给他吃。小毅吃饭很闹腾,吃个饭也要又蹦又跳,几个人围着他转,无语啊,小皇帝。
10、孩子从2,3个月开始,最好开始榨一些新鲜果汁给他喝,我儿子很喜欢喝苹果汁和西瓜汁。
11、我儿子6个多月就开始会爬了,这时候要特别小心了,不要让孩子一个人呆在床上,他爬下床就得摔一跤了。小毅就摔了一次,他妈妈洗衣服,将他一个人放床上,没想到他会爬,还爬到床下去了:) 听他妈妈说,脑袋红了一块,哭了几分钟就没事儿了。要注意,如果摔了,不会哭起来,赶紧送医院。
暂时想到这些。。。待补充。
最近一直在观察一个很有趣的现象,在国米超级杯失利后,魔力鸟同学发表的刺激“愤愤”的言论之后各大媒体网站的有趣新闻,这些新闻只是进一步验证了中国足球妓者的不专业,同样也让某些90后球迷兴奋地难以言喻,为什么说他们90后?因为只有脑残到一定程度才会为了这么一场无关宏旨的比赛意淫成那样。看看性浪网站的意假专题,AC米兰是祖国江山一片好,而国米的新闻却是凄凄惨惨切切,实际呢,AC米兰今年能进前4就知足吧,国米今年联赛冠军照拿不误,当然宇宙无敌王朝队的超超超新星太多了,30岁以上的新星暂时都不要了。魔力鸟同学捅了马蜂窝,说了大实话,中国就没一个专业的足球妓者嘛,除了编造新闻(以《体坛》最牛)、写写花边新闻、跟米卢合影上床(仅限女妓者)之外就不会干点别的,可魔力鸟同学不干了,不仅不合影,还说了实话,还不让足球妓者们提不专业的问题,你还不让不让足球妓者们活呀,全国的足球妓者们统一起来,全国的足球妓者们万岁,打倒魔力鸟主义,打倒国际米兰!
周六参加了公司的半年会,这是我第一次参加年会,2000多号人将学校的礼堂做的满满,舞台也搞的相当华丽。华丽之外的几点感受:
1、做人,做事,在公司上班工作,都需要使命感和责任感,一家没有使命感的公司是注定做不长久的。对一份工作没有使命感和热情,也是注定做不长久的。
2、我们总是做着自以为帮助别人的同时深深地伤害别人。成人们在感动小女孩的遭遇的同时,也在不断扯裂她心里的伤疤。一个孩子懂事早熟,这不是什么幸事,而是悲哀。什么时候大人们能不那么粗暴地强加自己的视角给孩子们?
3、老马果然很能说。
4、参加义工活动还是很有意义的事情,尽管我觉的形式化了。
5、在两千人面前求婚,事实证明不是那么靠谱的事情。
豆瓣是我经常上的网站,我在上面维护我的读书列表、电影列表和照片,平常有什么想说,偶尔也去那广播吹水,再加上一帮同好在那,因此逛豆瓣也成了我每天的习惯。当三表说豆瓣是脑残的时候,俺心里还提它辩解了下,要说他的推荐功能确实非常强大,而且能有这么一个维护读书计划的地方还是很不错的。
不过豆瓣最近的行为,让我越来越觉它脑残,理由如下:
一、删除照片和删除日志,从来就是一封冷冰冰的邮件通知,“你的XX图片因不符合豆瓣的图片策略已被删除”。我想去看看是哪张照片,但因为照片太多,却不知道是删的是哪张了。你丫就不会提供一个回收站啊,或者自动隐藏你认为有问题的照片,你丫删得干脆,用户就郁闷了,想知道原因的话,人家豆瓣就是公务猿,一问三不知。
二、动不动就他妈地搞什么“此功能暂停使用”,经常不允许使用广播,不允许写日志。我知道不是技术问题,我知道你们迫不得已,但是你们就不知道改变策略,搞个敏感词过滤或者隐藏五毛们要求删除的日志或者广播,而不是停掉功能。靠,你一个网站连正常的功能都提供不了,我上豆瓣干嘛?奸尸啊我。
三、广告用户越来越多,经常是一些注册没超过一个月,活动记录没超过两条的在书评或者影评蹦跶,将一本垃圾书(比如《敏捷无敌》)或者垃圾电影(比如《赤壁》)夸的跟天上的星星一样亮晶晶。更甚者是到处加好友,加完了就不断邀请你参加一些脑残的广告活动,烦不胜烦。这个现象的出现一方面说明豆瓣越来越受人关注的,另一方面就不能搞个防骚扰机制?并且对评价结果也应该能做出自动修正,对一些广告账户的评价应该做极低权重处理。为什么是极低?你没看到吗,在某部电影的上映档期内,一大群马甲可以把他评的跟《教父》一样,大大误导广大人民群众。
在多进程、多线程并发的环境里,从概念上看,有多个进程或者多个线程在同时执行,具体到单个CPU级别,实际上任何时刻只能有一个进程或者线程处于执行状态;因此OS需要决定哪个进程执行,哪些进程等待,也就是进程的调度。
一、调度的目标
1、首先要区分程序使用CPU的三种模式:IO密集型、计算密集型和平衡型。对于IO密集型程序来说,响应时间非常重要;对于CPU密集型来说,CPU的周转时间就比较重要;对于平衡型程序来说,响应和周转之间的平衡是最重要的。
2、CPU的调度就是要达到极小化平均响应时间、极大化系统吞吐率、保持系统各个功能部件均处于繁忙状态和提供某种公平的机制。
3、对于实时系统来说,调度的目标就是要达到截止时间前完成所应该完成的任务和提供性能的可预测性。
二、调度算法
1、FCFS(First come first serve),或者称为FIFO算法,先来先处理。这个算法的优点是简单,实现容易,并且似乎公平;缺点在于短的任务有可能变的非常慢,因为其前面的任务占用很长时间,造成了平均响应时间非常慢。
2、时间片轮询算法,这是对FIFO算法的改进,目的是改善短程序(运行时间短)的响应时间,其方法就是周期性地进行进程切换。这个算法的关键点在于时间片的选择,时间片过大,那么轮转就越接近FIFO,如果太小,进程切换的开销大于执行程序的开销,从而降低了系统效率。因此选择合适的时间片就非常重要。选择时间片的两个需要考虑的因素:一次进程切换所使用的系统消耗以及我们能接受的整个系统消耗、系统运行的进程数。
时间片轮询看上起非常公平,并且响应时间非常好,然而时间片轮转并不能保证系统的响应时间总是比FIFO短,这很大程度上取决于时间片大小的选择,以及这个大小与进程运行时间的相互关系。
3、STCF算法(Short time to complete first),顾名思义就是短任务优先算法。这种算法的核心就是所有的程序都有一个优先级,短任务的优先级比长任务的高,而OS总是安排优先级高的进程运行。
STCF又分为两类:非抢占式和抢占式。非抢占式STCF就是让已经在CPU上运行的程序执行到结束或者阻塞,然后在所有的就绪进程中选择执行时间最短的来执行;而抢占式STCF就不是这样,在每进来一个新的进程时,就对所有进程(包括正在CPU上执行的进程)进行检查,谁的执行时间短,就运行谁。
STCF总是能提供最优的响应时间,然而它也有缺点,第一可能造成长任务的程序无法得到CPU时间而饥饿,因为OS总是优先执行短任务;其次,关键问题在于我们怎么知道程序的运行时间,怎么预测某个进程需要的执行时间?通常有两个办法:使用启发式方法估算(例如根据程序大小估算),或者将程序执行一遍后记录其所用的CPU时间,在以后的执行过程中就可以根据这个测量数据来进行STCF调度。
4、优先级调度,STCF遇到的问题是长任务的程序可能饥饿,那么优先级调度算法可以通过给长任务的进程更高的优先级来解决这个问题;优先级调度遇到的问题可能是短任务的进程饥饿,这个可以通过动态调整优先级来解决。实际上动态调整优先级(称为权值)+时间片轮询的策略正是linux的进程调度策略之一的 SCHED_OTHER分时调度策略,它的调度过程如下:
(1)创建任务指定采用分时调度策略,并指定优先级nice值(-20~19)。
(2)将根据每个任务的nice值确定在cpu上的执行时间(counter)。
(3)如果没有等待资源,则将该任务加入到就绪队列中。
(4)调度程序遍历就绪队列中的任务,通过对每个任务动态优先级的计算(counter+20-nice)结果,选择计算结果最大的一个去运行,当这个时间片用完后(counter减至0)或者主动放弃cpu时,该任务将被放在就绪队列末尾(时间片用完)或等待队列(因等待资源而放弃cpu)中。
(5)此时调度程序重复上面计算过程,转到第4步。
(6)当调度程序发现所有就绪任务计算所得的权值都为不大于0时,重复第2步。
linux还有两个实时进程的调度策略:FIFO和RR,实时进程会立即抢占非实时进程。
5、显然,没有什么调度算法是毫无缺点的,因此现代OS通常都会采用混合调度算法。例如将不同的进程分为几个大类,每个大类有不同的优先级,不同大类的进程的调度取决于大类的优先级,同一个大类的进程采用时间片轮询来保证公平性。
6、其他调度算法,保证调度算法保证每个进程享用的CPU时间完全一样;彩票调度算法是一种概率调度算法,通过给进程“发彩票”的多少,来赋予不同进程不同的调用时间,彩票调度算法的优点是非常灵活,如果你给短任务发更多“彩票”,那么就类似STCF调度,如果给每个进程一样多的“彩票”,那么就类似保证调度;用户公平调度算法,是按照每个用户,而不是按照每个进程来进行公平分配CPU时间,这是为了防止贪婪用户启用了过多进程导致系统效率降低甚至停顿。
7、实时系统的调度算法,实时系统需要考虑每个具体任务的响应时间必须符合要求,在截止时间前完成。
(1)EDF调度算法,就是最早截止任务优先(Earliest deadline first)算法,也就是让最早截止的任务先做。当新的任务过来时,如果它的截止时间更靠前,那么就让新任务抢占正在执行的任务。EDF算法其实是贪心算法的一种体现。如果一组任务可以被调度(也就是所有任务的截止时间在理论上都可以得到满足),那么EDF可以满足。如果一批任务不能全部满足(全部在各自的截止时间前完成),那EDF满足的任务数最多,这就是它最优的体现。EDF其实就是抢占式的STCF,只不过将程序的执行时间换成了截止时间。EDF的缺点在于需要对每个任务的截止时间做计算并动态调整优先级,并且抢占任务也需要消耗系统资源。因此它的实际效果比理论效果差一点。
(2)RMS调度算法,EDF是动态调度算法,而RMS(rate monotonic scheduling)算法是一种静态最优算法;该算法在进行调度前先计算出所有任务的优先级,然后按照计算出来的优先级进行调度,任务执行中间既不接收新任务,也不进行优先级调整或者CPU抢占。因此它的优点是系统消耗小,缺点就是不灵活了。对于RMS算法,关键点在于判断一个任务组是否能被调度,这里有一个定律,如果一个系统的所有任务的CPU利用率都低于ln2,那么这些任务的截止时间均可以得到满足,ln2约等于0.693147,也就是此时系统还剩下有30%的CPU时间。这个证明是Liu和Kayland在1973年给出的。
三、优先级反转
1、什么是优先级反转?
优先级反转是指一个低优先级的任务持有一个被高优先级任务所需要的共享资源。高优先任务由于因资源缺乏而处于受阻状态,一直等到低优先级任务释放资源为止。而低优先级获得的CPU时间少,如果此时有优先级处于两者之间的任务,并且不需要那个共享资源,则该中优先级的任务反而超过这两个任务而获得CPU时间。如果高优先级等待资源时不是阻塞等待,而是忙循环,则可能永远无法获得资源,因为此时低优先级进程无法与高优先级进程争夺CPU时间,从而无法执行,进而无法释放资源,造成的后果就是高优先级任务无法获得资源而继续推进。
2、解决方案:
(1)设置优先级上限,给临界区一个高优先级,进入临界区的进程都将获得这个高优先级,如果其他试图进入临界区的进程的优先级都低于这个高优先级,那么优先级反转就不会发生。
(2)优先级继承,当一个高优先级进程等待一个低优先级进程持有的资源时,低优先级进程将暂时获得高优先级进程的优先级别,在释放共享资源后,低优先级进程回到原来的优先级别。嵌入式系统VxWorks就是采用这种策略。
这里还有一个八卦,1997年的美国的火星探测器(使用的就是vxworks)就遇到一个优先级反转问题引起的故障。简单说下,火星探测器有一个信息总线,有一个高优先级的总线任务负责总线数据的存取,访问总线都需要通过一个互斥锁(共享资源出现了);还有一个低优先级的,运行不是很频繁的气象搜集任务,它需要对总线写数据,也就同样需要访问互斥锁;最后还有一个中优先级的通信任务,它的运行时间比较长。平常这个系统运行毫无问题,但是有一天,在气象任务获得互斥锁往总线写数据的时候,一个中断发生导致通信任务被调度就绪,通信任务抢占了低优先级的气象任务,而无巧不成书的是,此时高优先级的总线任务正在等待气象任务写完数据归还互斥锁,但是由于通信任务抢占了CPU并且运行时间比较长,导致气象任务得不到CPU时间也无法释放互斥锁,本来是高优先级的总线任务也无法执行,总线任务无法及时执行的后果被探路者认为是一个严重错误,最后就是整个系统被重启。Vxworks允许优先级继承,然而遗憾的工程师们将这个选项关闭了。
(3)第三种方法就是使用中断禁止,通过禁止中断来保护临界区,采用此种策略的系统只有两种优先级:可抢占优先级和中断禁止优先级。前者为一般进程运行时的优先级,后者为运行于临界区的优先级。火星探路者正是由于在临界区中运行的气象任务被中断发生的通信任务所抢占才导致故障,如果有临界区的禁止中断保护,此一问题也不会发生。
最近工作上是在处理一个线程安全的问题,如何保证对某个资源的访问是独占的,不会有并发的隐患。在此过程中接了
checkthread,一个线程安全的静态分析工具,通过annotation标记在编译期检查可能的并发隐患,提供了一个eclipse插件,有兴趣可以看看他的 example。这个东西有个最不好的地方就是要依赖他的自定义annotation,如果不介意的话还是不错的选择,一些常见的隐患都能够标示出来。
另外就是去了解了下netty3,jboss的子项目,netty2和mina作者的另一个作品,关注它是因为在他的benchmark中,netty3的表现很优秀,可以看这篇报告《
Performance comparision between java nio framework》。大概看了下他的设计思路,其实与mina并无多大区别,不过使用了一些有意思的trick,例如对于write的处理,一般的nio框架都是放入一个队列,然后注册写事件(队列为空的时候),等待写,写通常也是单线程去写。而netty3做了一个优化,发送消息时同样有一个队列,在放入队列后判断当前是否正在写循环中,如果正在写,那么就注册一个WriteTask唤醒selector等待写;如果没有,那么发送线程立即就去执行这个写操作,这里的一个好处是少了两个开销:注册事件等待触发以及线程切换。Selector.wakeup的操作是比较昂贵的,netty3也做了优化。更多东西等待探索。
山寨nio框架yanf4j已经挺久没有做出任何改进,这次索性将很多过去考虑不成熟、实践中证明不必要的代码删除和简化,然后做了个与mina 2.0 -M5的性能对比,采用的netty3作者的benchmark源码,yanf4j的Echo Server如下:

Echo Server
import java.nio.ByteBuffer;
import org.jboss.netty.benchmark.echo.Constant;
import com.google.code.yanf4j.config.Configuration;
import com.google.code.yanf4j.nio.Session;
import com.google.code.yanf4j.nio.TCPController;
import com.google.code.yanf4j.nio.impl.HandlerAdapter;
public class Yanf4j {
public static void main(String[] args) throws Exception {
boolean threadPoolDisabled = args.length > 0
&& args[0].equals("nothreadpool");
Configuration configuration = new Configuration();
configuration.setCheckSessionTimeoutInterval(0);
configuration.setSessionIdleTimeout(0);
configuration.setTcpNoDelay(true);
configuration.setReadThreadCount(Runtime.getRuntime()
.availableProcessors());
configuration.setPort(Constant.PORT);
configuration
.setSessionReadBufferSize(Constant.INITIAL_READ_BUFFER_SIZE);
TCPController controller = new TCPController(configuration);
controller.setHandler(new EchoHandler());
if (!threadPoolDisabled) {
controller
.setDispatchMessageThreadPoolSize(Constant.THREAD_POOL_SIZE);
}
controller.start();
System.out.println("Yanf4j EchoServer is ready to serve at port "
+ Constant.PORT + ".");
System.out
.println("Enter 'ant benchmark' on the client side to begin.");
System.out.println("Thread pool: "
+ (threadPoolDisabled ? "DISABLED" : "ENABLED"));
}
static class EchoHandler extends HandlerAdapter {
@Override
public void onReceive(final Session session, final Object msg) {
session.send(((ByteBuffer) msg).duplicate());
}
@Override
public void onException(Session session, Throwable t) {
session.close();
}
}
}
最后的分析报表,可以看到yanf4j的性能与mina2的性能相近,不过mina在内存使用上非常狠。此外,Xmemcached 1.1.3 将采用最新的yanf4j 0.7.0。
(横坐标是并发连接数,纵坐标是吞吐量,单位为M/s,测试JDK为1.6.4,具体硬件环境不再详细列出,与xmemcached的benchmark同)
四张图分别是在消息长度为64、256、1024、4096字节下的对比。
XMemcached发布1.1.2版本,这一版本仍然是1.1.0版本以来的改进版本,主要的改进如下:
1.支持设置memcached
节点权重,权重高的负载相应比较大。
2.为部分协议添加
noreply选项,memcached 1.2.5引入了noreply支持,部分文本协议(如存储,删除,incr/decr等)允许附加设置一个noreply,表示客户端不要求memcached应答。这一特性利于批量处理。
3.支持与
spring框架的集成。
4.添加
verbosity协议,这个协议用于让客户端设置memcached的日志输出级别。
5.一些细节改进。XMemcached从0.5开始就有重连机制,在连接意外断开的情况下会不断地自动重连,不过间隔是10秒,现在改成将间隔缩小为0秒以便客户端能及时连接。改进了JMX支持,可以通过JMX查看节点权重和动态设置节点权重。
6.BUG修复,包括:Issue 35、Issue 36、Issue 37、Issue 38等,具体请看
这里
7.去除了对spy-2.4.jar依赖,现在序列化部分已经不再需要spymemcached的这个jar包。
项目主页:
http://code.google.com/p/xmemcached/
下载地址:
http://code.google.com/p/xmemcached/downloads/list
wiki地址:
http://code.google.com/p/xmemcached/w/list
下面是关于特性的详细说明,首先是权重的使用,看例子:
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("localhost:12000 localhost:12001"),new int[]{1,3});
MemcachedClient memcachedClient=builder.build();
现在的
XMemcachedClientBuilder允许传入了两个参数,一个是InetSocketAddress组成的列表,一个是权重的数组,权重数组的元素与列表中的地址一一对应,例如这里就是将"localhost:12000"节点的权重设置为1,而将"localhost:12001"的权重设置为3。同样在XMemcachedClientMBean中添加了两个新的方法:
public void addOneServerWithWeight(String server, int weight)
throws IOException;
/**
* Set a memcached server's weight
*
* @param server
* @param weight
*/
public void setServerWeight(String server, int weight);
用于动态添加和修改节点的权重。
其次,为了支持
noreply选项,MemcachedClient接口引入了系列xxxWithNoReply方法,例如
public abstract void setWithNoReply(final String key, final int exp,
final Object value) throws InterruptedException, MemcachedException;
public abstract <T> void setWithNoReply(final String key, final int exp,
final T value, final Transcoder<T> transcoder)
throws InterruptedException, MemcachedException;
public abstract void addWithNoReply(final String key, final int exp,
final Object value) throws InterruptedException, MemcachedException;
public abstract void replaceWithNoReply(final String key, final int exp,
final Object value) throws InterruptedException, MemcachedException;
public void deleteWithNoReply(final String key)
throws InterruptedException, MemcachedException;
完整的列表请看changelog.txt, noreply系列方法非常适合于批量处理,比之需要等待memcached应答的效率上提升很多。
第三,与spring的集成,通过XMemcachedClientFactoryBean可以很方便地与spring框架集成,最简单的配置如下:
<bean name="memcachedClient"
class="net.rubyeye.xmemcached.utils.XMemcachedClientFactoryBean">
<property name="servers">
<value>localhost:12000 localhost:12001</value>
</property>
</bean>
只要设置servers属性,那么就可以在任何需要的地方引用memcachedClient这个Bean.更完整的配置参考
wiki
第四,引入了对verbosity协议的支持,通过两个新方法:
public void setLoggingLevelVerbosity(InetSocketAddress address, int level)
throws TimeoutException, InterruptedException, MemcachedException;
public void setLoggingLevelVerbosityWithNoReply(InetSocketAddress address,
int level) throws InterruptedException, MemcachedException;
其中的level就是日志级别。请注意你的memcached版本是否支持这一协议。
1.1.2是一个承前启后的版本,按俺的计划应该还有个1.1.3(专注性能改进和优化),之后才是实现了二进制协议的1.2.0。俺非常希望能有任何人给出任何建议和bug反馈。
在JavaMemCached这个memacched客户端,如果你有多个memcachd节点,你可以设置memcached server的权重,权重高的节点在存储、获取等操作就相应占的比重比较大。恰巧我最近也在实现一个类似这样流控的东西,因此在xmemcached实现了此feature。这个功能暂定在1.2.0的时候发布,但是现在已经可以从svn获取,只是你需要自己build。
使用方法,与通常调用的唯一区别就是在创建MemcachedClient的时候,
MemcachedClientBuilder builder = new XMemcachedClientBuilder (AddrUtil.getAddresses("localhost:12000 localhost:12001"),new int[]{1,3});
MemcachedClient memcachedClient = builder.build();
XMemcachedClientBuilder新增一个重载构造函数,除了传入地址列表之外,还可以传入一个权重数组表示列表中的memcached节点权重,权重数组与地址列表一一对应。这里将localhost:12001的权重设为3,而localhost:12000的权重设置为1。 如果没有提供权重值,默认都是为1。这个feature已经进行了测试,在随机化测试下完全符合比例要求。这一feature对于是使用标准哈希,还是一致性哈希都有效。
实现原理是添加weight次相同的session存储在session查找集合里,但是注意这里仍然是只有一个连接的,只是在集合里存储了这个连接的多份引用,那么在查找session的过程中,找到权重大(引用多)的连接的几率相应就比较大。
5.1 图就不画在机器上了,麻烦
5.2 用寄存器语言描述5.1题中的阶乘机器,加上了读取和打印,这里的解答全部在实际的寄存机器中验证过,但是仍然按照该节的表示法表示。
(controller
fac-loop
(assign n (op read))
(assign product (const 1))
(assign counter (const 1))
iter-loop
(test (op >) (reg counter) (reg n))
(branch (label iter-done))
(assign product (op *) (reg product) (reg counter))
(assign counter (op +) (reg counter) (const 1))
(goto (label iter-loop))
iter-done
(perform (op print) (reg product))
(goto (label fac-loop)))
5.3 牛顿法求平方根,将这个过程转化为寄存器语言,第一个版本,假设good-enough?和improve都是基本过程,
;version1
(controller
sqrt-loop
(test (op good-enough?) (reg guess))
(branch (label sqrt-done))
(assign guess (op improve) (reg guess))
(goto (label good-enough))
sqrt-done)
第二个版本,展开good-enough?过程,
;version2
(controller
good-enough
(assign t (op square) (reg guess))
(assign t (op -) (reg t) (reg x))
(assign t (op abs) (reg t))
(test (op <) (reg t) (const 0.001))
(branch (label sqrt-done))
(assign guess (op improve) (reg guess))
(goto (label good-enough))
sqrt-done)
最后,展开improve过程,
;version3
(controller
sqrt-init
(assign guess (const 1.0))
(assign x (op read))
good-enough
;good-enough
(assign t (op square) (reg guess))
(assign t (op -) (reg t) (reg x))
(assign t (op abs) (reg t))
(test (op <) (reg t) (const 0.001))
(branch (label sqrt-done))
;improve
(assign t (op /) (reg x) (reg guess))
(assign t (op +) (reg guess) (reg t))
(assign guess (op /) (reg t) (const 2.0))
(goto (label good-enough))
sqrt-done)
在start之后,从寄存器guess中得到最后结果。
5.4
a)第一个是一个指数计算过程,用到了递归,因此需要引入continue寄存器来保存和恢复堆栈,实现与阶乘相似,如下
(controller
(assign continue (label expt-done))
expt-loop
(test (op =) (reg n) (const 0))
(branch (label expt-base-case))
;;保存continue
(save continue)
(assign n (op -) (reg n) (const 1))
(assign continue (label after-expt))
(goto (label expt-loop))
after-expt
;;恢复continue
(restore continue)
(assign val (op *) (reg b) (reg val))
(goto (reg continue))
expt-base-case
(assign val (const 1))
(goto (reg continue))
expt-done
(perform (op display) (reg val)))
b) 迭代型的递归计算过程,尾递归调用,因此不需要continue寄存器来保存和恢复堆栈,这道习题就是希望能分辨非尾递归和尾递归带来的寄存机器语言的区别
(controller
(assign product (const 1))
(assign n (op read))
(assign b (op read))
(assign counter (reg n))
expt-iter-loop
(test (op =) (reg counter) (const 0))
(branch (label expt-done))
(assign counter (op -) (reg counter) (const 1))
(assign product (op *) (reg b) (reg product))
(goto (label expt-iter-loop))
expt-done
(perform (op display) (reg product)))
5.5 手工模拟就算了,5.2节就可以机器模拟了
5.6 是有一个多余的save和一个多余的restore操作,请看注释:
(
(assign continue (label fib-done))
fib-loop
(test (op <) (reg n) (const 2))
(branch (label immediate-answer))
;;compute fib(n-1)
(save continue)
(assign continue (label after-fib-1))
(save n)
(assign n (op -) (reg n) (const 1))
(goto (label fib-loop))
after-fib-1
;;compute fib(n-2)
(restore n)
;这里多余,(restore continue)
(assign n (op -) (reg n) (const 2))
;这里多余,(save continue)
(assign continue (label after-fib-2))
(save val) ;;save fib(n-1)
(goto (label fib-loop))
after-fib-2
(assign n (reg val))
(restore val)
(restore continue)
(assign val (op +) (reg val) (reg n))
(goto (reg continue))
immediate-answer
(assign val (reg n))
(goto (reg continue))
fib-done)