hengheng123456789

  BlogJava :: 首页 :: 联系 :: 聚合  :: 管理
  297 Posts :: 68 Stories :: 144 Comments :: 0 Trackbacks
 

MINA Beginning

http://mina.apache.org/

http://mina.apache.org/documentation.html

1.         传统Socket:阻塞式通信

java传统socket技术中,每建立一个Socket连接时,须同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。

这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。下面的代码就说明了这一点。

a)         server code:

package Socket;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.ServerSocket;

import java.net.Socket;

public class MultiUserServer extends Thread {

       private Socket client;

       public MultiUserServer(Socket c) {

              this.client = c;

       }

       public void run() {

              try {

                     BufferedReader in = new BufferedReader(new InputStreamReader(client

                                   .getInputStream()));

                     // Mutil User but can't parallel

                     while (true) {

                            String str = in.readLine();

                            System.out.println("receive message: " + str);

                            if (str.equals("end"))

                                   break;

                     }

                     client.close();

              } catch (IOException ex) {

              }

       }

       public static void main(String[] args) throws IOException {

              int port = 10086;

              if (args.length > 0)

                     port = Integer.parseInt(args[0]);

              ServerSocket server = new ServerSocket(port);

              System.out.println("the server socket application is created!");

              while (true) {

                     // transfer location change Single User or Multi User

                     MultiUserServer mu = new MultiUserServer(server.accept());

                     mu.start();

              }

       }

}

b)        client code:

package Socket;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.io.PrintWriter;

import java.net.Socket;

public class Client {

       static Socket server;

       public static void main(String[] args) throws Exception {

              String host = "192.168.0. 10";

              int port = 10086;

              if (args.length > 1) {

                     host = args[0];

                     port = Integer.parseInt(args[1]);

              }

              System.out.println("connetioning:" + host + ":" + port);

              server = new Socket(host, port);

              PrintWriter out = new PrintWriter(server.getOutputStream());

              BufferedReader wt = new BufferedReader(new InputStreamReader(System.in));

              while (true) {

                     String str = wt.readLine();

                     out.println(str);

                     out.flush();

                     if (str.equals("end")) {

                            break;

                     }

              }

              server.close();

       }

}

2.         nio socket: 非阻塞通讯模式

a)         NIO 设计背后的基石:反应器模式

反应器模式: 用于事件多路分离和分派的体系结构模式。

反应器模式的核心功能如下:

n         将事件多路分用

n         将事件分派到各自相应的事件处理程序

b)        NIO 的非阻塞 I/O 机制是围绕 选择器 通道构建的。

选择器(Selector) Channel 的多路复用器。 Selector 类将传入客户机请求多路分用并将它们分派到各自的请求处理程序。

通道(Channel ):表示服务器和客户机之间的一种通信机制,一个通道负责处理一类请求/事件。

简单的来说:

NIO是一个基于事件的IO架构,最基本的思想就是:有事件我会通知你,你再去做与此事件相关的事情。而且NIO主线程只有一个,不像传统的模型,需要多个线程以应对客户端请求,也减轻了JVM的工作量。

c)        Channel注册至Selector以后,经典的调用方法如下:

        while (somecondition) {

            int n = selector.select(TIMEOUT);

            if (n == 0)

                continue;

            for (Iterator iter = selector.selectedKeys().iterator(); iter

                    .hasNext();) {

                if (key.isAcceptable())

                    doAcceptable(key);

                if (key.isConnectable())

                    doConnectable(key);

                if (key.isValid() && key.isReadable())

                    doReadable(key);

                if (key.isValid() && key.isWritable())

                    doWritable(key);

                iter.remove();

            }

        }

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。

Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。

d)        Sample01

package NIO;

// ==================== Program Discription =====================

// 程序名称:示例12-14 : SocketChannelDemo.java

// 程序目的:学习Java NIO#SocketChannel

// ==============================================================

import java.net.InetSocketAddress;

import java.net.ServerSocket;

import java.nio.ByteBuffer;

import java.nio.channels.SelectableChannel;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

public class SocketChannelDemo {

       public static int PORT_NUMBER = 23;// 监听端口

       static String line = "";

       ServerSocketChannel serverChannel;

       ServerSocket serverSocket;

       Selector selector;

       private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

       public static void main(String[] args) throws Exception {

              SocketChannelDemo server = new SocketChannelDemo();

              server.init(args);

              server.startWork();

       }

       public void init(String[] argv) throws Exception {

              int port = PORT_NUMBER;

              if (argv.length > 0) {

                     port = Integer.parseInt(argv[0]);

              }

              System.out.println("Listening on port " + port);

              // 分配一个ServerSocketChannel

              serverChannel = ServerSocketChannel.open();

              // ServerSocketChannel里获得一个对应的Socket

              serverSocket = serverChannel.socket();

              // 生成一个Selector

              selector = Selector.open();

              // Socket绑定到端口上

              serverSocket.bind(new InetSocketAddress(port));

              // serverChannel为非bolck

              serverChannel.configureBlocking(false);

              // 通过Selector注册ServerSocetChannel

              serverChannel.register(selector, SelectionKey.OP_ACCEPT);

       }

       public void startWork() throws Exception {

              while (true) {

                     int n = selector.select();// 获得IO准备就绪的channel数量

                     if (n == 0) {

                            continue; // 没有channel准备就绪,继续执行

                     }

                     // 用一个iterator返回Selectorselectedkeys

                     Iterator it = selector.selectedKeys().iterator();

                     // 处理每一个SelectionKey

                     while (it.hasNext()) {

                            SelectionKey key = (SelectionKey) it.next();

                            // 判断是否有新的连接到达

                            if (key.isAcceptable()) {

                                   // 返回SelectionKeyServerSocketChannel

                                   ServerSocketChannel server = (ServerSocketChannel) key

                                                 .channel();

                                   SocketChannel channel = server.accept();

                                   registerChannel(selector, channel, SelectionKey.OP_READ);

                                   doWork(channel);

                            }

                            // 判断是否有数据在此channel里需要读取

                            if (key.isReadable()) {

                                   processData(key);

                            }

                            // 删除 selectedkeys

                            it.remove();

                     }

              }

       }

       protected void registerChannel(Selector selector,

                     SelectableChannel channel, int ops) throws Exception {

              if (channel == null) {

                     return;

              }

              channel.configureBlocking(false);

              channel.register(selector, ops);

       }

       // 处理接收的数据

       protected void processData(SelectionKey key) throws Exception {

              SocketChannel socketChannel = (SocketChannel) key.channel();

              int count;

              buffer.clear(); // 清空buffer

              // 读取所有的数据

              while ((count = socketChannel.read(buffer)) > 0) {

                     buffer.flip();

                     // send the data, dont assume it goes all at once

                     while (buffer.hasRemaining()) {

                            char c = (char) buffer.get();

                            line += c;

                            // 如果收到回车键,则在返回的字符前增加[echo]$字样,并且server端打印出字符串

                            if (c == (char) 13) {

                                   buffer.clear();

                                   buffer.put("[echo]$".getBytes());

                                   buffer.flip();

                                   System.out.println(line); //

                                   line = "";

                            }

                            socketChannel.write(buffer);// Socket里写数据

                     }

                     buffer.clear(); // 清空buffer

              }

              if (count < 0) {

                     // count<0,说明已经读取完毕

                     socketChannel.close();

              }

       }

       private void doWork(SocketChannel channel) throws Exception {

              buffer.clear();

              buffer

                            .put("Hello,I am working,please input some thing,and i will echo to you![echo]$"

                                          .getBytes());

              buffer.flip();

              channel.write(buffer);

       }

}

运行此程序,然后在控制台输入命令telnet localhost 23

e)         Server code:

public class NonBlockingServer

{

    public Selector sel = null;

    public ServerSocketChannel server = null;

    public SocketChannel socket = null;

    public int port = 4900;

    String result = null;

    public NonBlockingServer()

    {

              System.out.println("Inside default ctor");

    }

       public NonBlockingServer(int port)

    {

              System.out.println("Inside the other ctor");

              this.port = port;

    }

    public void initializeOperations() throws IOException,UnknownHostException

    {

              System.out.println("Inside initialization");

              sel = Selector.open();

              server = ServerSocketChannel.open();

              server.configureBlocking(false);

              InetAddress ia = InetAddress.getLocalHost();

              InetSocketAddress isa = new InetSocketAddress(ia,port);

              server.socket().bind(isa);

    }

       public void startServer() throws IOException

    {

              System.out.println("Inside startserver");

        initializeOperations();

              System.out.println("Abt to block on select()");

              SelectionKey acceptKey = server.register(sel, SelectionKey.OP_ACCEPT );     

              while (acceptKey.selector().select() > 0 )

              {    

          

                     Set readyKeys = sel.selectedKeys();

                     Iterator it = readyKeys.iterator();

                     while (it.hasNext()) {

                            SelectionKey key = (SelectionKey)it.next();

                            it.remove();

                

                            if (key.isAcceptable()) {

                                   System.out.println("Key is Acceptable");

                                   ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                                   socket = (SocketChannel) ssc.accept();

                                   socket.configureBlocking(false);

                                   SelectionKey another = socket.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);

                            }

                            if (key.isReadable()) {

                                   System.out.println("Key is readable");

                                   String ret = readMessage(key);

                                   if (ret.length() > 0) {

                                          writeMessage(socket,ret);

                                   }

                            }

                            if (key.isWritable()) {

                                   System.out.println("THe key is writable");

                                   String ret = readMessage(key);

                                   socket = (SocketChannel)key.channel();

                                   if (result.length() > 0 ) {

                                          writeMessage(socket,ret);

                                   }

                            }

                     }

              }

    }

    public void writeMessage(SocketChannel socket,String ret)

    {

              System.out.println("Inside the loop");

              if (ret.equals("quit") || ret.equals("shutdown")) {

                     return;

              }

              try

              {

                     String s = "This is content from server!-----------------------------------------";

                     Charset set = Charset.forName("us-ascii");

                     CharsetDecoder dec = set.newDecoder();

                     CharBuffer charBuf = dec.decode(ByteBuffer.wrap(s.getBytes()));

                     System.out.println(charBuf.toString());

                     int nBytes = socket.write(ByteBuffer.wrap((charBuf.toString()).getBytes()));

                     System.out.println("nBytes = "+nBytes);

                            result = null;

              }

              catch(Exception e)

              {

                     e.printStackTrace();

              }

    }

    public String readMessage(SelectionKey key)

    {

              int nBytes = 0;

              socket = (SocketChannel)key.channel();

        ByteBuffer buf = ByteBuffer.allocate(1024);

              try

              {

            nBytes = socket.read(buf);

                     buf.flip();

                     Charset charset = Charset.forName("us-ascii");

                     CharsetDecoder decoder = charset.newDecoder();

                     CharBuffer charBuffer = decoder.decode(buf);

                     result = charBuffer.toString();

          

        }

              catch(IOException e)

              {

                     e.printStackTrace();

              }

              return result;

    }

    public static void main(String args[])

    {

           NonBlockingServer nb;

           if (args.length < 1)

           {

                  nb = new NonBlockingServer();

           }

           else

           {

                  int port = Integer.parseInt(args[0]);

                  nb = new NonBlockingServer(port);

           }

               

              try

              {

                     nb.startServer();

                     System.out.println("the nonBlocking server is started!");

              }

              catch (IOException e)

              {

                     e.printStackTrace();

                     System.exit(-1);

              }

       }

}

2.2.4.2    Client code:

public class Client {

       public SocketChannel client = null;

       public InetSocketAddress isa = null;

       public RecvThread rt = null;

       private String host;

       private int port;

       public Client(String host, int port) {

              this.host = host;

              this.port = port;

       }

       public void makeConnection() {

              String proxyHost = "192.168.254.212";

              String proxyPort = "1080";

              System.getProperties().put("socksProxySet", "true");

              System.getProperties().put("socksProxyHost", proxyHost);

              System.getProperties().put("socksProxyPort", proxyPort);

              int result = 0;

              try {

                     client = SocketChannel.open();

                     isa = new InetSocketAddress(host, port);

                     client.connect(isa);

                     client.configureBlocking(false);

                     receiveMessage();

              } catch (UnknownHostException e) {

                     e.printStackTrace();

              } catch (IOException e) {

                     e.printStackTrace();

              }

              long begin = System.currentTimeMillis();

              sendMessage();

              long end = System.currentTimeMillis();

              long userTime = end - begin;

              System.out.println("use tiem: " + userTime);

              try {

                     interruptThread();

                     client.close();

                     System.exit(0);

              } catch (IOException e) {

                     e.printStackTrace();

              }

       }

       public int sendMessage() {

              System.out.println("Inside SendMessage");

              String msg = null;

              ByteBuffer bytebuf;

              int nBytes = 0;

              try {

                     msg = "It's message from client!";

                     System.out.println("msg is "+msg);

                     bytebuf = ByteBuffer.wrap(msg.getBytes());

                     for (int i = 0; i < 1000; i++) {

                            nBytes = client.write(bytebuf);

                            System.out.println(i + " finished");

                     }

                     interruptThread();

                     try {

                            Thread.sleep(5000);

                     } catch (Exception e) {

                            e.printStackTrace();

                     }

                     client.close();

                     return -1;

              } catch (IOException e) {

                     e.printStackTrace();

              }

              return nBytes;

       }

       public void receiveMessage() {

              rt = new RecvThread("Receive THread", client);

              rt.start();

       }

       public void interruptThread() {

              rt.val = false;

       }

       public static void main(String args[]) {

              if (args.length < 2) {

                     System.err.println("You should put 2 args: host,port");

              } else {

                     String host = args[0];

                     int port = Integer.parseInt(args[1]);

                     Client cl = new Client(host, port);

                     cl.makeConnection();

              }

              BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

              String msg;

       }

       public class RecvThread extends Thread {

              public SocketChannel sc = null;

              public boolean val = true;

              public RecvThread(String str, SocketChannel client) {

                     super(str);

                     sc = client;

              }

              public void run() {

                     int nBytes = 0;

                     ByteBuffer buf = ByteBuffer.allocate(2048);

                     try {

                            while (val) {

                                   while ((nBytes = nBytes = client.read(buf)) > 0) {

                                          buf.flip();

                                          Charset charset = Charset.forName("us-ascii");

                                          CharsetDecoder decoder = charset.newDecoder();

                                          CharBuffer charBuffer = decoder.decode(buf);

                                          String result = charBuffer.toString();

                                          System.out.println("the server return: " + result);

                                          buf.flip();

                                   }

                            }

                     } catch (IOException e) {

                            e.printStackTrace();

                     }

              }

       }

}

Reactor模式和NIO

当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:

u        Read request

u        Decode request

u        Process service

u        Encode reply

u        Send reply

经典的网络服务的设计如下图,在每个线程中完成对数据的处理:

但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。

Reactor模式类似于AWT中的Event处理:

Reactor模式参与者

1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWTthread
2.Handler
是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlersevent事件绑定,类似于AWT addActionListener

如图:

JavaNIOreactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。

我们来看看Reactor模式代码:


public class Reactor implements Runnable{

  final Selector selector;
  final ServerSocketChannel serverSocket;

  Reactor(int port) throws IOException {
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
    serverSocket.socket().bind(address);

    serverSocket.configureBlocking(false);
    //selector注册该channel
     SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

    logger.debug("-->Start serverSocket.register!");

    //利用skattache功能绑定Acceptor 如果有事情,触发Acceptor
    sk.attach(new Acceptor());
    logger.debug("-->attach(new Acceptor()!");
  }


  public void run() { // normally in a new Thread
    try {
    while (!Thread.interrupted())
    {
      selector.select();
      Set selected = selector.selectedKeys();
      Iterator it = selected.iterator();
      //Selector如果发现channelOP_ACCEPTREAD事件发生,下列遍历就会进行。
      while (it.hasNext())
        //来一个事件第一次触发一个accepter线程
        //以后触发SocketReadHandler
        dispatch((SelectionKey)(it.next()));
        selected.clear();
      }
    }catch (IOException ex) {
        logger.debug("reactor stop!"+ex);
    }
  }

  //运行AcceptorSocketReadHandler
  void dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());
    if (r != null){
      // r.run();

    }
  }

  class Acceptor implements Runnable { // inner
    public void run() {
    try {
      logger.debug("-->ready for accept!");
      SocketChannel c = serverSocket.accept();
      if (c != null)
        //调用Handler来处理channel
        new SocketReadHandler(selector, c);
      }
    catch(IOException ex) {
      logger.debug("accept stop!"+ex);
    }
    }
  }
}

以上代码中巧妙使用了SocketChannelattach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler

再看看Handler代码:

public class SocketReadHandler implements Runnable {

  public static Logger logger = Logger.getLogger(SocketReadHandler.class);

  private Test test=new Test();

  final SocketChannel socket;
  final SelectionKey sk;

   static final int READING = 0, SENDING = 1;
  int state = READING;

  public SocketReadHandler(Selector sel, SocketChannel c)
    throws IOException {

    socket = c;

    socket.configureBlocking(false);
     sk = socket.register(sel, 0);

    //SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
    //参看dispatch(SelectionKey k)
    sk.attach(this);

    //同时将SelectionKey标记为可读,以便读取。
    sk.interestOps(SelectionKey.OP_READ);
    sel.wakeup();
  }

  public void run() {
    try{
    // test.read(socket,input);
      readRequest() ;
    }catch(Exception ex){
    logger.debug("readRequest error"+ex);
    }
  }


/**
*
处理读取data
* @param key
* @throws Exception
*/
private void readRequest() throws Exception {

  ByteBuffer input = ByteBuffer.allocate(1024);
  input.clear();
  try{

    int bytesRead = socket.read(input);

    ......

    //激活线程池处理这些request
    requestHandle(new Request(socket,btt));

    .....


  }catch(Exception e) {
  }

}

注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理 写 发出等流程处理。

将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:

更进一步,我们可以使用多个Selector分别处理连接和读事件。

一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。

3.         Socket网络框架 MINA

a)         Overview

MINA是一个网络应用框架,在不牺牲性能和可扩展性的前提下用于解决如下问题:

n         快速开发自己的应用。

n         高可维护性,高可复用性:网络I/O编码,消息的编/解码,业务逻辑互相分离。

n         相对容易的进行单元测试。

b)        MINA架构:

IoSessionManager: Where real I/O occurs

IoFilters: Filters I/O events • requests

IoHandler: Your protocol logic

IoSession: Represents a connection

n         IoFilters

IoFilterMINA的功能扩展提供了接口。它拦截所有的IO事件进行事件的预处理和河畜处理(AOP)。我们可以把它想象成Servletfilters

IoFilter能够实现以下几种目的:

事件日志

性能检测

数据转换(e.g. SSL support)codec

防火墙…等等

n         codec: ProtocolCodecFactory

MINA提供了方便的Protocol支持。如上说讲,codecIoFilters中设置。

通过它的EncoderDecoder,可以方便的扩展并支持各种基于Socket的网络协议,比如HTTP服务器、FTP服务器、Telnet服务器等等。

要实现自己的编码/解码器(codec)只需要实现interface: ProtocolCodecFactory即可.

MINA 1.0版本,MINA已经实现了几个常用的(codec factory):

DemuxingProtocolCodecFactory,

NettyCodecFactory,

ObjectSerializationCodecFactory,

TextLineCodecFactory

其中:

n         TextLineCodecFactory:

A ProtocolCodecFactory that performs encoding and decoding between a text line data and a Java

string object. This codec is useful especially when you work with a text-based protocols such as SMTP and IMAP.

n         ObjectSerializationCodecFactory:

A ProtocolCodecFactory that serializes and deserializes Java objects. This codec is very useful when

you have to prototype your application rapidly without any specific codec.

n         DemuxingProtocolCodecFactory

A composite ProtocolCodecFactory that consists of multiple MessageEncoders and MessageDecoders. ProtocolEncoder and ProtocolDecoder this factory returns demultiplex incoming messages and buffers to appropriate MessageEncoders and MessageDecoders.

n         NettyCodecFactory:

A MINA ProtocolCodecFactory that provides encoder and decoder for Netty2 Messages and MessageRecognizers.

n         IoHandler :business logic

MINA中,所有的业务逻辑都在实现了IoHandlerclass完成。

Interface Handle:

 all protocol events fired by MINA. There are 6 event handler methods, and they are all invoked by MINA automatically.

 当事件发生时,将触发IoHandler中的方法:

 sessionCreated:当一个session创建的时候调用;

 sessionOpened:在sessionCreated调用之后被调用;

sessionClosed:当IO连接被关闭时被调用;

 sessionIdle:当在远程实体和用户程序之间没有数据传输的时候被调用;

exceptionCaught:当IoAcceptor 或者IoHandler.中出现异常时被调用;

messageReceived:当接受到消息时调用;

messageSent:当发出请求时调用。

MINA 1.0中,IoHandler的实现类:

ChainedIoHandler

 DemuxingIoHandler,

IoHandlerAdapter

 SingleSessionIoHandlerDelegate

 StreamIoHandler

具体细节可参考javadoc

c)        MINA的高级主题:线程模式

MINA通过它灵活的filter机制来提供多种线程模型。

没有线程池过滤器被使用时MINA运行在一个单线程模式。

如果添加了一个IoThreadPoolFilterIoAcceptor,将得到一个leader-follower模式的线程池。

如果再添加一个ProtocolThreadPoolFilterserver将有两个线程池:

一个(IoThreadPoolFilter)被用于对message对象进行转换,另外一个(ProtocolThreadPoolFilter)被用于处理业务逻辑。

SimpleServiceRegistry加上IoThreadPoolFilterProtocolThreadPoolFilter的缺省实现即可适用于需要高伸缩性的应用。如果想使用自己的线程模型,请参考SimpleServiceRegistry的源代码,并且自己

初始化Acceptor

IoThreadPoolFilter threadPool = new IoThreadPoolFilter();threadPool.start();

IoAcceptor acceptor = new SocketAcceptor();

acceptor.getFilterChain().addLast( "threadPool", threadPool);

ProtocolThreadPoolFilter threadPool2 = new ProtocolThreadPoolFilter();

threadPool2.start();

ProtocolAcceptor acceptor2 = new IoProtocolAcceptor( acceptor );

acceptor2.getFilterChain().addLast( "threadPool", threadPool2 );

...

threadPool2.stop();

threadPool.stop();

d)        采用MINA进行socket开发,一般步骤如下:

n         Begin:

IoAcceptor acceptor = new SocketAcceptor(); //建立client接收器

or client:

SocketConnector connector = new SocketConnector(); //建立一个连接器

n         server的属性配置:

        SocketAcceptorConfig cfg = new SocketAcceptorConfig();

        cfg.setReuseAddress(true);

        cfg.getFilterChain().addLast(

                    "codec",

                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) ); //对象序列化 codec factory

        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

n         绑定addressbusiness logic

server:

        acceptor.bind(

                new InetSocketAddress( SERVER_PORT ),

                new ServerSessionHandler( ), cfg ); // 绑定addresshandler

client:

        connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                        new ClientSessionHandler(msg), cfg );

n         实现自己的业务逻辑: IoHandler

n         如有必要,实现自己的CODEC

下面的代码演示了采用ObjectSerializationCodecFactory给服务端传送文件:

e)         Client

public class Client

{

    private static final String HOSTNAME = "192.168.0.81";

    private static final int PORT = 8080;

    private static final int CONNECT_TIMEOUT = 30; // seconds

    public static void main( String[] args ) throws Throwable

    {

        System.out.println("in nio client");

        SocketConnector connector = new SocketConnector();       

        // Configure the service.

        SocketConnectorConfig cfg = new SocketConnectorConfig();

        cfg.setConnectTimeout( CONNECT_TIMEOUT );

          cfg.getFilterChain().addLast(

                    "codec",

                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );

        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

        IoSession session;

        if(args.length > 1)

        {

            connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                    new ClientSessionHandler(args), cfg );

        }

        else

        {

            String[] files = {"E:/music/lcl/juhuatai.mp3",

                                    "E:/music/lcl/jimosazhouleng.mp3"};

            connector.connect(new InetSocketAddress( HOSTNAME, PORT ),

                    new ClientSessionHandler(files), cfg );

        }

    }

}

f)         Clint handleclient端的业务代码)

public class ClientSessionHandler extends IoHandlerAdapter

    private String[] files;

    public ClientSessionHandler(String[] files)

    {

        this.files = files;

    }

    public void sessionOpened( IoSession session )

    {

        for (int i = 0; i < this.files.length; i++)

        {

            Thread sendMessageThread = new SendMessageThread("Thread" + i, session,files[i]);

            sendMessageThread.start();

        }

    }

    public void messageReceived( IoSession session, Object message )

    {

        System.out.println("in messageReceived!");

    }

    public void exceptionCaught( IoSession session, Throwable cause )

    {

        session.close();

    }

    public class SendMessageThread extends Thread

    {

        private IoSession session;

        private String filename;

        public SendMessageThread(String name, IoSession session, String filename)

        {

            super(name);

            this.session = session;

            this.filename = filename;

        }

        public void run()

        {

            System.out.println("start thread: " + this.getName());

            try {               

                ByteBuffer buf = ByteBuffer.allocate(Constants.BUF_SIZE);

                

                FileChannel fc = new FileInputStream(filename).getChannel();

                int index;

                while ((index = NioFileUtil.readFile(fc, buf)) > 0)

                {

                  buf.flip();

                  byte[] bs;

                  if (index == buf.capacity())

                  {

                      bs = buf.array();

                  }

                  else

                  {

                      bs = new byte[index];

                      int i = 0;

                     while (buf.hasRemaining())

                      {

                          bs[i++] = buf.get();

                      }

                  }

                  Message msg = new Message(filename,Constants.CMD_SEND, bs);

                  session.write(msg);

                }

                Message msg = new Message(filename, Constants.CMD_FINISHED, null);

                session.write(msg);        

            } catch (Exception e) {

                e.printStackTrace();

            }          

        }

    }

}

g)        Server

public class Server

{

    private static final int SERVER_PORT = 8080;

    public static void main( String[] args ) throws Throwable

    {

        IoAcceptor acceptor = new SocketAcceptor();

        // Prepare the service configuration.

        SocketAcceptorConfig cfg = new SocketAcceptorConfig();

        cfg.setReuseAddress( true );

        cfg.getFilterChain().addLast(

                    "codec",

                    new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );

        cfg.getFilterChain().addLast( "logger", new LoggingFilter() );

        acceptor.bind(

                new InetSocketAddress( SERVER_PORT ),

                new ServerSessionHandler( ), cfg );

        System.out.println( "nioFileServer Listening on port " + SERVER_PORT );

    }

}

h)        Server handle:(Server端业务代码)

public class ServerSessionHandler extends IoHandlerAdapter

{   

    public void sessionOpened( IoSession session )

    {

        // set idle time to 60 seconds

        System.out.println("in sessionOpened");

        session.setIdleTime( IdleStatus.BOTH_IDLE, 60 );

        session.setAttribute("times",new Integer(0));

    }

    public void messageReceived( IoSession session, Object message )

    {

        System.out.println("in messageReceived");

           Message msg = (Message) message;

           System.out.println("the file name is: " + msg.getFileName() + ""n");

           this.process(session, msg);

          

    }

    private void process(IoSession session, Message message)

    {

        String[] temparray = message.getFileName().split("[//]");

        String filename ="d:/" + temparray[temparray.length - 1];

        if (session.containsAttribute(message.getFileName()))

        {

            FileChannel channel = (FileChannel)session.getAttribute(message.getFileName());

            if (message.getType().equals(Constants.CMD_SEND))

            {

                try {

                    NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));

                } catch (Exception e) {

                    e.printStackTrace();

                }               

            }

            else

            {

                try {

                    channel.close();

                    channel = null;

                    session.removeAttribute(message.getFileName());

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

        else

        {

            try {

                FileChannel channel = new FileOutputStream(filename).getChannel();

                NioFileUtil.writeFile(channel, ByteBuffer.wrap(message.getContent()));

                session.setAttribute(message.getFileName(), channel);

            } catch (Exception e) {

                // TODO Auto-generated catch block

                e.printStackTrace();

            }          

        }

    }

    public void sessionIdle( IoSession session, IdleStatus status )

    {

        SessionLog.info( session, "Disconnecting the idle." );

        // disconnect an idle client

        session.close();

    }

    public void exceptionCaught( IoSession session, Throwable cause )

    {

        // close the connection on exceptional situation

        session.close();

    }

}

i)          文件操作:

public class NioFileUtil {

    public static void writeFile(FileChannel fileChannel, ByteBuffer buf) throws Exception

    {

        buf.clear();

        fileChannel.write(buf);    

    }

    public static int readFile(FileChannel fileChannel,ByteBuffer buf) throws IOException

    {

        buf.rewind();

        int index = fileChannel.read(buf);

        return index;

    } 

}

j)          常量:

public class Constants {

    public static final String CMD_FINISHED = "FINISHED";

    public static final String CMD_SEND = "SEND";    

    public static final int BUF_SIZE = 10240;

    private Constants(){}   

}

Demo

Introduction

org.apache.mina.example.chat

Chat server which demonstates using the text line codec and Spring integration.

org.apache.mina.example.chat.client

Swing based chat client.

org.apache.mina.example.echoserver

Echo server which demonstates low-level I/O layer and SSL support.

org.apache.mina.example.echoserver.ssl

SSL support classes.

org.apache.mina.example.httpserver.codec

A HTTP server implemented with protocol codec (needs more work).

org.apache.mina.example.httpserver.stream

A simplistic HTTP server which demonstates stream-based I/O support.

org.apache.mina.example.netcat

NetCat client (Network + Unix cat command) which demonstates low-level I/O layer.

org.apache.mina.example.proxy

A TCP/IP tunneling proxy example.

org.apache.mina.example.reverser

Reverser server which reverses all text lines demonstating high-level protocol layer.

org.apache.mina.example.sumup

SumUp Server and Client which sums up all ADD requests.

org.apache.mina.example.sumup.codec

Protocol codec implementation for SumUp protocol.

org.apache.mina.example.sumup.message

Protocol mmessage classes for SumUp protocol.

org.apache.mina.example.tennis

Two tennis players play a game which demonstates in-VM pipes.

n         友情提示:

下载并运行MINAdemo程序还颇非周折:

运行MINA demo applition

1:JDK5

产生错误:

Exception in thread "main" java.lang.NoClassDefFoundError: edu/emory/mathcs/backport/java/util/concurrent/Executor

       at org.apache.mina.example.reverser.Main.main(Main.java:44)

察看minaQA email:

http://www.mail-archive.com/mina-dev@directory.apache.org/msg02252.html

原来需要下载:backport-util-concurrent.jar并加入classpath

http://dcl.mathcs.emory.edu/util/backport-util-concurrent/

继续运行还是报错:

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

原来MINA采用了slf4j项目作为log,继续下载

slf4j-simple.jar等,并加入classpath:

http://www.slf4j.org/download.html

posted on 2007-09-03 15:38 哼哼 阅读(2955) 评论(0)  编辑  收藏 所属分类: JAVA-Web

只有注册用户登录后才能发表评论。


网站导航: