netty3.2.3源码分析-ClientBootstrap启动分析

Posted on 2010-12-06 16:59 alex_zheng 阅读(3257) 评论(0)  编辑  收藏 所属分类: java
在看完了server端的启动,再来看client端的启动过程是怎么进行的。例子是TelentServer对应的TelentClient
public class TelnetClient {

    
public static void main(String[] args) throws Exception {
        
        ClientBootstrap bootstrap 
= new ClientBootstrap(
                
new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

        
// Configure the pipeline factory.
        bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());

        
// Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

        
// Wait until the connection attempt succeeds or fails.
        Channel channel = future.awaitUninterruptibly().getChannel();
        
if (!future.isSuccess()) {
            future.getCause().printStackTrace();
            bootstrap.releaseExternalResources();
            
return;
        }

        
    }
}
直接看connect方法
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {

        
if (remoteAddress == null) {
            
throw new NullPointerException("remoteAddress");
        }

        ChannelPipeline pipeline;
        
try {
            pipeline 
= getPipelineFactory().getPipeline();
        } 
catch (Exception e) {
            
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
        }

        
// Set the options.
        
//NioClientSocketChannel构造函数中会触发channelopen
        
//TelnetClientPipelineFactory中的upstreamhandler没有重写channelOpen,这里只是一直往下传递该事件
        Channel ch = getFactory().newChannel(pipeline);
        ch.getConfig().setOptions(getOptions());

        
// Bind.
        if (localAddress != null) {
            ch.bind(localAddress);
        }

        
// Connect.
        return ch.connect(remoteAddress);
    }
然后执行ch.connect(remoteAddress);
这里是NioClientSocketChannel-->NioSocketChannel-->AbstractChannel
public ChannelFuture connect(SocketAddress remoteAddress) {
       
return Channels.connect(this, remoteAddress);
}

public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
        
if (remoteAddress == null) {
            
throw new NullPointerException("remoteAddress");
        }
        ChannelFuture future 
= future(channel, true);
        channel.getPipeline().sendDownstream(
new DownstreamChannelStateEvent(
                channel, future, ChannelState.CONNECTED, remoteAddress));
        
return future;
}

从TelnetClientPipelineFactory的pipeline中由下往上传递CONNECTED事件,这里只有一个StringEncoder-->OneToOneEncoder,其
handleDownstream方法对该事件不做处理,往上传递该事件,执行DefaultChannelHandlerContext.sendDownstream
public void sendDownstream(ChannelEvent e) {
            
//在StringEncoder之前再没有downstreamhandler
            DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
            
if (prev == null) {
                
try {
                    getSink().eventSunk(DefaultChannelPipeline.
this, e);
                } 
catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } 
else {
                DefaultChannelPipeline.
this.sendDownstream(prev, e);
            }
        }
执行NioClientSocketPipelineSink.eventSunk,其中会执行
 private void connect(
            
final NioClientSocketChannel channel, final ChannelFuture cf,
            SocketAddress remoteAddress) {
        
try {
            
//如果返回true,调用nioworker.register,开始启动nioworker线程处理该channel的读写
            
//否则,交给boss.register方法,在boss线程中完成连接
            if (channel.socket.connect(remoteAddress)) {
                channel.worker.register(channel, cf);
            } 
else {
                
//为当前clientsocketchannel添加closed的listener
                channel.getCloseFuture().addListener(new ChannelFutureListener() {
                    
public void operationComplete(ChannelFuture f)
                            
throws Exception {
                        
if (!cf.isDone()) {
                            cf.setFailure(
new ClosedChannelException());
                        }
                    }
                });
                cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                channel.connectFuture 
= cf;
                
                boss.register(channel);
            }

        } 
catch (Throwable t) {
            cf.setFailure(t);
            fireExceptionCaught(channel, t);
            channel.worker.close(channel, succeededFuture(channel));
        }
    }

执行boss.register,在boss线程中确保该channel连接成功,这里会启动boss线程
void register(NioClientSocketChannel channel) {
            
//在RegisterTask的run方法里注册SelectionKey.OP_CONNECT
            Runnable registerTask = new RegisterTask(this, channel);
            
                
boolean offered = registerTaskQueue.offer(registerTask);
                
assert offered;
            }

            
if (wakenUp.compareAndSet(falsetrue)) {
                selector.wakeup();
            }
        }
最后启动boss.run,其中processSelectedKeys里执行connect
private void connect(SelectionKey k) {
            NioClientSocketChannel ch 
= (NioClientSocketChannel) k.attachment();
            
try {
                
if (ch.socket.finishConnect()) {
                    k.cancel();
                    
//连接成功,才在nioworker中启动一个新线程来处理该socketchannel的读写
                    ch.worker.register(ch, ch.connectFuture);
                }
            } 
catch (Throwable t) {
                ch.connectFuture.setFailure(t);
                fireExceptionCaught(ch, t);
                ch.worker.close(ch, succeededFuture(ch));
            }
        }

之后就是交给nioworker线程来进行数据的发送和接收了。

只有注册用户登录后才能发表评论。


网站导航:
 

posts - 10, comments - 9, trackbacks - 0, articles - 15

Copyright © alex_zheng