随笔-23  评论-58  文章-0  trackbacks-0

JAVA NIO 多线程服务器是 Nut (lucene + hadoop 分布式搜索运行框架)  Nut Search层封装代码


public interface Reactor 
{
    
void execute(SelectionKey key);
}


public class Reader implements Reactor 
{
    
private static final Log log = LogFactory.getLog(Reader.class);
    
    
private byte[] bytes=new byte[0];
    
private ExecutorService executor;
    
    
public Reader(ExecutorService executor)
    
{
        
this.executor=executor;
    }

    
    @Override
    
public void execute(SelectionKey key)
    
{
        SocketChannel sc 
= (SocketChannel) key.channel();

        
try
        
{
            ByteBuffer buffer
=ByteBuffer.allocate(1024);
            
int len=-1;
            
while((len=sc.read(buffer))>0)
            
{
                buffer.flip();
                  
byte [] content = new byte[buffer.limit()];
                buffer.get(content);
                bytes
=NutUtil.ArrayCoalition(bytes,content);
                buffer.clear();
            }

            
if(len==0)
                key.interestOps(SelectionKey.OP_READ);
            
else
            
{
                Callable
<byte[]> call=new ProcessCallable(bytes);
                Future
<byte[]> task=executor.submit(call);
                ByteBuffer output
=ByteBuffer.wrap(task.get());
                sc.register(key.selector(), SelectionKey.OP_WRITE, 
new Writer(output));
            }

        }

        
catch(Exception e)
        
{
            log.info(e);
        }

    }

}


public class Writer implements Reactor 
{
    
private static final Log log = LogFactory.getLog(Writer.class);
    
    
private ByteBuffer output;
    
    
public Writer(ByteBuffer output)
    
{
        
this.output=output;
    }

    
    
public void execute(SelectionKey key)
    
{
        SocketChannel sc 
= (SocketChannel) key.channel();
        
try
        
{
            
while(output.hasRemaining())
            
{
                
int len=sc.write(output);
                
if(len<0)
                

                    
throw new EOFException(); 
                }
 
                
if(len==0
                

                    key.interestOps(SelectionKey.OP_WRITE); 
                    key.selector().wakeup(); 
                    
break
                }

            }

            
if(!output.hasRemaining())
            
{
                output.clear();
                key.cancel();
                sc.close();
            }

        }

        
catch(IOException e)
        
{
            log.info(e);
        }

    }

}


public class MiniServer
{
    
private static final Log log = LogFactory.getLog(MiniServer.class);
    
    
private final Selector s;
    
private final ServerSocketChannel ssc;
    
private ExecutorService executor;
    
    
private static Map<String,Long> map=new TreeMap<String,Long>();//保存不能正确完成的SelectionKey
    private ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
    
    
public MiniServer(int portnumber,ExecutorService executor) throws IOException
    
{
        scheduled.scheduleAtFixedRate(task,
10,10,TimeUnit.MINUTES);//每10分钟清空一次map
        this.executor=executor;
        s 
= Selector.open();
        ssc 
= ServerSocketChannel.open();
        ssc.socket().bind(
new InetSocketAddress(portnumber));
        ssc.configureBlocking(
false);
        ssc.register(s,SelectionKey.OP_ACCEPT);
    }

    
    
public void execute()
    
{
        
try
        
{
            
while(s.isOpen())
            
{
                
int nKeys=s.select();
                
if(nKeys==0)
                
{
                    
for (SelectionKey key : s.keys())
                    
{
                        log.info(
"channel " + key.channel() + " waiting for " + key.interestOps());
                        
//如果超过2分钟就废除
                        if(map.containsKey(key.toString()))
                        
{
                            Long t
= map.get(key.toString());
                            
if((NutUtil.now()-t)>200);
                            
{
                                map.remove(key.toString());
                                s.keys().remove(key);
                                key.cancel();
                            }

                        }

                        
else
                        
{
                            map.put(key.toString(), NutUtil.now());
                        }

                    }

                    
continue;
                }

                
                Iterator
<SelectionKey> it = s.selectedKeys().iterator();  
                
while (it.hasNext()) 
                
{
                    SelectionKey key 
= it.next();
                    it.remove();
                    
if (!key.isValid() || !key.channel().isOpen())
                        
continue;
                    
if(key.isAcceptable())
                    
{
                        SocketChannel sc 
= ssc.accept();
                        
if (sc != null)
                        
{
                            sc.configureBlocking(
false);
                            sc.register(s, SelectionKey.OP_READ, 
new Reader(executor));
                        }

                    }

                    
else if(key.isReadable()||key.isWritable())
                    
{
                        Reactor reactor 
= (Reactor) key.attachment();
                        reactor.execute(key);
                    }

                }

            }

        }

        
catch(IOException e)
        
{
            log.info(e);
        }

    }

    
    Runnable task 
= new Runnable()
    
{
        
public void run()
        
{
            map.clear();
        }

    }
;
}
posted on 2010-07-26 11:31 nianzai 阅读(2684) 评论(2)  编辑  收藏 所属分类: NIO

评论:
# re: JAVA NIO 多线程服务器 1.1版 2010-07-26 20:34 | intex充气床
谢谢!  回复  更多评论
  
# re: JAVA NIO 多线程服务器 1.1版 2013-02-02 16:11 | jnan77
ProcessCallable 这是什么包的呢  回复  更多评论
  

只有注册用户登录后才能发表评论。


网站导航: