庄周梦蝶

生活、程序、未来
   :: 首页 ::  ::  :: 聚合  :: 管理

Ruby Fiber指南(三)过滤器

Posted on 2010-03-11 23:49 dennis 阅读(4507) 评论(1)  编辑  收藏 所属分类: 动态语言my open-source

    Ruby Fiber指南(一)基础
    Ruby Fiber指南(二)参数传递
    Ruby Fiber指南(三)过滤器
    Ruby Fiber指南(四)迭代器
    Ruby Actor指南(五)实现Actor

     在学习了Fiber的基础知识之后,可以尝试用Fiber去做一些比较有趣的事情。这一节将讲述如何使用Fiber来实现类似unix系统中的管道功能。在unix系统中,可以通过管道将多个命令组合起来做一些强大的功能,最常用的例如查找所有的java进程:
ps aux|grep java
通过组合ps和grep命令来实现,ps的输出作为grep的输入,如果有更多的命令就形成了一条过滤链。过滤器本质上还是生产者和消费者模型,前一个过滤器产生结果,后一个过滤器消费这个结果并产生新的结果给下一个过滤器消费。因此我们就从最简单的生产者消费者模型实现说起。
我们要展示的这个例子场景是这样:生产者从标准输入读入用户输入并发送给消费者,消费者打印这个输入,整个程序是由消费者驱动的,消费者唤醒生存者去读用户输入,生产者读到输入后让出执行权给消费者去打印,整个过程通过生产者和消费者的协作完成。
生产者发送是通过yield返回用户输入给消费者(还记的上一节吗?):

def send(x)
  Fiber.
yield(x)
end

而消费者的接收则是通过唤醒生产者去生产:
def receive(prod)
  prod.resume
end

生产者是一个Fiber,它的任务就是等待用户输入并发送结果给消费者:
def producer()
  Fiber.new do
    
while true
      x
=readline.chomp
      send(x)
    end
  end
end

消费者负责驱动生产者,并且在接收到结果的时候打印,消费者是root fiber:
def consumer(producer)
  
while true
    x
=receive(producer)
    
break if x=='quit'
    puts x
  end
end

    最终的调用如下:
consumer(producer())
   完整的程序如下:

#生产者消费者
require 'fiber'

def send(x)
  Fiber.
yield(x)
end

def receive(prod)
  prod.resume
end

def producer()
  Fiber.new do
    
while true
      x
=readline.chomp
      send(x)
    end
  end
end


def consumer(producer)
  
while true
    x
=receive(producer)
    
break if x=='quit'
    puts x
  end
end
if $0==__FILE__
  consumer(producer())
end

   读者可以尝试在ruby1.9下运行这个程序,每次程序都由消费者驱动生产者去等待用户输入,用户输入任何东西之后回车,生产者开始运行并将读到的结果发送给消费者并让出执行权(通过yield),消费者在接收到yield返回的结果后打印这个结果,因此整个交互过程是一个echo的例子。

最终的调用consumer(producer())已经有过滤器的影子在了,如果我们希望在producer和consumer之间插入其他过程对用户的输入做处理,也就是安插过滤器,那么新的过滤器也将作为fiber存在,新的fiber消费producer的输出,并输出新的结果给消费者,例如我们希望将用户的输入结果加上行号再打印,那么就插入一个称为filter的fiber:
def filter(prod)
  
return Fiber.new do
    line
=1
    
while true
      value
=receive(prod)
      value
=sprintf("%5d %s",line,value)
      send(value)
      line
=line.succ
    end
  end
end

    最终组合的调用如下:
 consumer(filter(producer()))
   类似unix系统那样,简单的加入新的fiber组合起来就可以为打印结果添加行号。

类似consumer(filter(producer()))的调用方式尽管已经很直观,但是我们还是希望能像unix系统那样调用,也就是通过竖线作为管道操作符:
producer | filter | consumer
这样的调用方式更将透明直观,清楚地表明整个过滤器链的运行过程。幸运的是在Ruby中支持对|方法符的重载,因此要实现这样的操作符并非难事,只要对Fiber做一层封装即可,下面给出的代码来自《Programming ruby》的作者Dave Thomas的blog

class PipelineElement
   attr_accessor :source
   
def initialize
      @fiber_delegate 
= Fiber.new do
         process
      end
   end

   
def |(other)
      other.source 
= self
      other
   end

   
def resume
      @fiber_delegate.resume
   end

   
def process
      
while value = input
         handle_value(value)
      end
   end

   
def handle_value(value)
      output(value)
   end

   
def input
      @source.resume
   end

   
def output(value)
      Fiber.
yield(value)
   end
end

这段代码非常巧妙,将Fiber和Ruby的功能展示的淋漓尽致。大致解说下,PipelineElement作为任何一个过滤器的父类,其中封装了一个fiber,这个fiber默认执行process,在process方法中可以看到上面生产者和消费者例子的影子,input类似receive方法调用前置过滤器(source),output则将本过滤器处理的结果作为参数传递给yield并让出执行权,让这个过滤器的调用者(也就是后续过滤器)得到结果并继续处理。PipelineElement实现了“|”方法,用于组合过滤器,将下一个过滤器的前置过滤器设置为本过滤器,并返回下一个过滤器。整个过滤链的驱动者是最后一个过滤器。

有了这个封装,那么上面生产者消费者的例子可以改写为:
class Producer < PipelineElement
   
def process
     
while true
       value
=readline.chomp
       handle_value(value)
     end
   end
end

class Filter < PipelineElement
  
def initialize
    @line
=1
    super()
  end
  
def handle_value(value)
    value
=sprintf("%5d %s",@line,value)
    output(value)
    @line
=@line.succ
  end
end

class Consumer < PipelineElement
  
def handle_value(value)
    puts value
  end
end

   现在的调用方式可以跟unix的管道很像了:
producer=Producer.new
filter
=Filter.new
consumer
=Consumer.new

pipeline 
= producer | filter | consumer
pipeline.resume
如果你打印pipeline对象,你将看到一条清晰的过滤链,
#<Consumer:0x8f08bf4 @fiber_delegate=#<Fiber:0x8f08a88>, @source=#<Filter:0x8f08db4 @line=1, @fiber_delegate=#<Fiber:0x8f08d60>, @source=#<Producer:0x8f09054 @fiber_delegate=#<Fiber:0x8f09038>>>>



评论

# re: Ruby Fiber指南(三)过滤器[未登录]  回复  更多评论   

2010-03-15 12:21 by Zale-U
参考了!

只有注册用户登录后才能发表评论。


网站导航: