海阔天空

I'm on my way!
随笔 - 17, 文章 - 69, 评论 - 21, 引用 - 0
数据加载中……

一个Tcp网络服务框架例子

WindowsIOCPLinuxepollFreeBSDkqueue写了一个支持高并发、多CPU、跨平台的TCP网络服务框架。

测试

下载netfrm.v2.rar,解压缩得到netfrm.v2目录,里面有netfrm.v2.vcprojsrc目录。
测试代码在src/main.cpp

#include <stdio.h>
#include "./lance/ldebug.h"
#include "./lance/tcpsrv.hpp"
#include "./lance/systm.h"
class MyClient : public lance::net::Client
{
public: void OnConnect()
         {
              printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);

              recv(data, 255);
         }
public: void OnDisconnect()
         {
              printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
         }
public: void OnRecv(int len)
         {
              data[len] = 0x00;
              printf("OnRecv: fd=%08x, data=%s\n", fd, data);

              if (data[0] == 'a')
              {
                   printf("user exit command\n");
                   close();
              }
              recv(data, 255);
         }

public: char data[256];
};


int main(char * args[])
{
     lance::net::TCPSrv<MyClient> srv;

     srv.ip = 0;
     srv.port = 1234;
     srv.ptr = NULL;
     srv.backlogs = 10;
     srv.threads = 1;
     srv.scheds = 0;
     srv.start();

     while(true)
     {
         lance::systm::sleep(2000);
     }

     return 0;
}

测试代码绑定本机所有IP地址,在1234端口开启网络服务,接收客户端发送的字符串,并将这些字符串打印到控制台上。

Windows平台

Windows XP SP2下用vs2003编译测试通过。
vs2003打开netfrm.v2.vcproj,然后编译、运行,会弹出控制台窗口。
Windows开始菜单->运行->cmd,启动Windows命令窗口,输入telnet 127.0.0.1 1234,回车连接到测试网络服务,如果一切正常,网络服务控制台窗口将显示连接信息,可以在Windows命令窗口随便输入信息,这时网络服务控制台窗口将打印输入的信息。
如下图所示:
500)this.width=500;" border="0" width="500">
1
输入字符a表示断开网络连接。

Linux平台

LinuxRed Hat Enterprise Linux 4下测试通过,其他Linux平台需要Linux 2.6及以上支持epoll的内核。
首先转到src目录:
$ cd src
编译:
$ make –f Makefile.linux clean all
这时会在当前目录生成tcpsrv.0.1.bin的可执行文件,执行:
$ ./ tcpsrv.0.1.bin
再打开一个命令行窗口,测试:
$ telnet 127.0.0.1 1234
输入字符串并回车,刚才执行tcpsrv.0.1.bin的窗口将打印连接和字符串信息。
输入a开头的字符串将断开连接。

FreeBSD平台

FreeBSDFreeBSD 6.2下测试通过,其他BSD平台需要支持kqueue的内核。
首先转到src目录:
$ cd src
编译:
$ make –f Makefile.freebsd clean all
这时会在当前目录生成tcpsrv.0.1.bin的可执行文件,执行:
$ ./ tcpsrv.0.1.bin
再打开一个命令行窗口,测试:
$ telnet 127.0.0.1 1234
输入字符串并回车,刚才执行tcpsrv.0.1.bin的窗口将打印连接和字符串信息。
输入a开头的字符串将断开连接。
 

使用

目录结构:
src
|---lance
   |---tcpsrv.hpp 主要接口文件
   |---iocptcpsrv.hpp Windows IOCP网络服务实现文件
   |---eptcpsrv.hpp Linux epoll网络服务实现文件
   |---kqtcpsrv.hpp FreeBSD kqueue网络服务实现文件
在某种平台下使用时,src/lance/tcpsrv.hpp必须,其他文件根据平台而定。
 
首先,创建一个Client类,这个类必须继承lance::net::Client,重载事件通知方法。

// Client对象类,当连接建立时自动创建,当连接断开时自动销毁
class MyClient : public lance::net::Client
{
         // 连接建立时被调动
public: void OnConnect()
         {
              printf("OnConnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
              // 通知调度系统接收数据
              // 数据这时并没有真正接收,当客户端有数据发送来时
              // 调度器自动接收数据,然后通过OnRecv通知数据接收完成
              recv(data, 255);
         }
         // 连接断开时被调用
public: void OnDisconnect()
         {
              printf("OnDisconnect: fd=%08x, ip=%d, port=%d\n", fd, ip, port);
         }
         // 当有数据被接收时调用,接收的实际数据长度为len
public: void OnRecv(int len)
         {
              data[len] = 0x00;
              printf("OnRecv: fd=%08x, data=%s\n", fd, data);
              // 断开连接命令
              if (data[0] == 'a')
              {
                   printf("user exit command\n");
                   // 通知调度系统断开连接,当调度系统处理完成后才真正断开连接
                   close();
              }
              // 通知调度系统接收数据
              // 数据这时并没有真正接收,当客户端有数据发送来时
              // 调度器自动接收数据,然后通过OnRecv通知数据接收完成
              recv(data, 255);
         }
// 数据缓冲区
public: char data[256];
};

 
然后创建一个lance::net::TCPSrv<T>的实例,这个实例负责调度网络服务。
 
具体代码参考src/main.cpplance::net::ClientOnConnectOnRecvOnDisconnect都由工作线程池处理,所以里面可以进行IO操作而不会影响系统响应。

int main(char * args[])
{
     lance::net::TCPSrv<MyClient> srv;
     
// 设置监听套接字绑定的IP
     
// 0为绑定所有本机可用IP地址
     srv.ip = 0;
     
// 监听端口
     srv.port = 1234;
     
// 绑定的对象或资源指针
     
// MyClient里面可以通过srv->ptr获取这个指针
     srv.ptr = NULL;
     
// 监听套接字连接队列长度
     srv.backlogs = 10;
     
// 处理线程池线程数
     srv.threads = 1;
     
// 调度器线程数,通常是本机CPU数的2倍
     
// 0表示自动选择
     srv.scheds = 0;
     
// 启动网络服务
     srv.start();
     
// 循环,保证进程不退出
     while(true)
     {
         lance::systm::sleep(2000);
     }
     return 0;
}

 
Windows平台的预编译宏是LANCE_WIN32
Linux平台的预编译宏是LANCE_LINUX
FreeBSD平台的预编译宏是LANCE_FREEBSD
 
Windows平台编译需要使用WIN32_LEAN_AND_MEAN_WIN32_WINNT=0x0500预编译宏来避免Winsock2Windows头文件的冲突,否则会产生大量类型重定义错误。

#define EPOLL_MAX_NFDS          10000    // max sockets queried by epoll.
#define EPOLL_MAX_EVENTS        100      // max events queried by epoll.
#define EPOLL_MAX_QUEUE         1024     // max events in cache queue.

 
Linux平台有额外三个预编译宏,参考src/lance/eptcpsrv.hpp
FreeBSD平台有额外三个预编译宏,参考src/lance/kqtcpsrv.hpp

#define KQUEUE_MAX_NFDS 10000 // max sockets queried by kqueue.
#define KQUEUE_MAX_EVENTS 100 // max events queried by kqueue.
#define KQUEUE_MAX_QUEUE 1024 // max events in cache queue.

Windows IOCP设计

首先用户接口部分,由两个类lance::net:TCPSrv<T>lance::net::Client
lance::net::TCPSrv<T>管理监听套接字、事件调度和事件处理。
lance::net::Client管理连接套接字。
lance::net::TCPSrv<T>lance::net::Listener<T>lance::net::Scheduler<T>lance::net::Processor<T>组成。
他们之间的关系如下:
500)this.width=500;" border="0" width="500">
2
Listener<T>管理监听套接字,有单独的线程执行,当有连接到来时,创建一个Client的对象实例,然后通过IOCP系统调用通知调度器有连接到来,参考src/lance/iocptcpsrv.hpp

template<typename T>
void Scheduler<T>::push(T * clt)
{
     ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
}

Scheduler<T>实际并不做很多事情,只是封装IOCP句柄,WindowsIOCP功能很丰富,包括管理事件队列和多CPU支持,所以Scheduler只是一个IOCP的映射。
Processor<T>管理线程池,这些线程池是工作线程,他们轮询SchedulerIOCP,从中取出系统事件,IOCP里面有三种事件,一种是客户端连接事件,一种是客户端数据事件,最后一种是连接断开事件,当有事件到来时,会得到Client对象的指针cltClientevent包含了事件类型,参考src/lance/iocptcpsrv.hpp

template<typename T>
DWORD WINAPI Processor<T>::run(LPVOID param)
{
     Processor<T>& procor = *(Processor<T> *)param;
     Scheduler<T>& scheder = *procor.scheder;
     HANDLE iocp = scheder.iocp;

     DWORD ready;
     ULONG_PTR key;
     WSAOVERLAPPED * overlap;
     while (true)
     {
         ::GetQueuedCompletionStatus(iocp, &ready, &key, (LPOVERLAPPED *)&overlap, INFINITE);

         T * clt = (T *)key;
         switch(clt->event)
         {
         case T::EV_RECV:
              {
                   if (0 >= ready)
                   {
                       clt->event = T::EV_DISCONNECT;
                       ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
                   }
                   else
                   {
                       clt->OnRecv(ready);
                   }
              }
              break;
         case T::EV_CONNECT:
              {
                   if (NULL == ::CreateIoCompletionPort((HANDLE)clt->fd, iocp, (ULONG_PTR)clt, 0))
                   {
                       ::closesocket(clt->fd);
                       delete clt;
                   }
                   else
                   {
                       clt->OnConnect();
                   }
              }
              break;
         case T::EV_DISCONNECT:
              {
                   clt->OnDisconnect();
                   ::closesocket(clt->fd);
                   delete clt;
              }
              break;
         case T::EV_SEND:
              break;
         }
     }

     return 0;
}

所以Client::OnConnectClient::OnRecvClient::OnDisconnect都在工作线程中进行,这些处理过程中都可以有IO等耗时操作,一个连接的阻塞不会影响其他连接的响应速度。
 
Client的其他方法Client::recvClient::sendClient::close
Client::recv是一个异步接收数据的方法,这个方面只是告诉IOCP想要接收客户端的数据,然后立即返回,由IOCP去负责接收数据,有数据收到时,Processor<T>的工作线程会收到Client::EV_RECV的消息,Processor<T>会调用Client::OnRecv进行通知。
Client::send是发送消息的函数,这个函数是阻塞调用,等待消息发送成功后才返回。
Client::close是主动断开客户端连接的方法,这个方法不会直接调用closesocket(fd),而是调用shutdown(fd)shutdown(fd)会向Scheduler<T>触发一个Client::EV_DISCONNECT的事件,然后Processor<T>调用Client::OnDisconnect通知连接断开,执行完Client::OnDisconnect后,由Processor<T>调用closesocket(fd)真正断开连接,这样设计一方面满足任何情况下OnDisconnect都被调用,另一方面因为操作系统会重用已经关闭的套接字fd,所以只有当OnDisconnect执行完毕后才真正调用closesocket让操作系统回收fd,可以避免使用无效的套接字或者挪用其他连接的套接字。
 

Linux epollFreeBSD kqueue设计

Linux epollFreeBSD kqueue的机制几乎一样,只有函数名字和个数不一样,所以一起分析,并且简写为Linux
因为Linux不像Windows一样会管理事件队列和多CPU支持,所以Linux需要额外实现事件队列和多CPU支持。
Linux下用户接口跟Windows一样,有lance::net::TCPSrv<T>lance::net::Client,因为跨平台,所以他们提供的接口功能和意义也一样,参考Windows一节。
lance::net::TCPSrv<T>管理连接套接字、事件队列、多CPU支持、事件调度和事件处理。
lance::net::TCPSrv<T>Listener<T>Scheduler<T>Processor<T>Queue<T>组成。
他们之间关系图如下:
500)this.width=500;" border="0" width="500">
3
Listener<T>管理监听套接字,有连接到来时创建一个Client的实例clt,初始化Client::eventClient::EV_CONNECT,然后将clt放入调度器,调度器为clt选择一个合适的epoll/kqueue进行绑定,然后将clt放入事件队列Queue<T>等待被Processor<T>执行。
 
Scheduler<T>管理epoll/kqueue,为了支持多CPU,一个Scheduler<T>可能管理多个epoll/kqueue,通过lance::net::TCPSrv::scheds进行设置,当lance::net::TCPSrv::scheds大于1时,Scheduler<T>将创建scheds个线程,每个线程管理一个epoll/kqueue。当Listener<T>提交一个新的clt时,Scheduler<T>顺序选择一个epoll/kqueue进行绑定,这是最简单的均等选择算法,epoll/kqueue会检查绑定的clt的数据接收和连接断开事件,如果有事件,会把产生这个事件的clt放入事件队列Queue<T>等待被Processor<T>执行,并且设置clt的套接字为休眠状态,因为epoll/kqueue为状态触发,如果事件在被Processor<T>处理前不休眠,会再次被触发,这样Queue<T>将被迅速填满。
CPU时,依靠多个epoll/kqueue能有效利用这些CPU
参考eptcpsrv.hpp

template<typename T>
void Scheduler<T>::push(T * clt)
{
     clt->epfd = epers[epoff].epfd;
     epoff = (epoff+1 == scheds)?0:(epoff+1);
     queue.in();
     while (queue.full())
     {
         queue.fullWait();
     }
     if (queue.empty())
     {
         queue.emptyNotify();
     }
     queue.push(clt);
     queue.out();
}

 
Queue<T>是有限缓冲队列,有队列最大长度EPOLL_MAX_QUEUE/KQUEUE_MAX_QUEUE,有限缓冲队列结构如下:

500)this.width=500;" border="0">

4
Queue<T>采用monitor模式,使用pthread_mutex_t lock保护临界区,使用pthread_cond_t emptySignal做队列由空到不空的通知,也就是唤醒消费者可以处理队列,使用pthread_cond_t fullSignal做队列由满到不满的通知,也就是唤醒生产者可以填充队列,这里Scheduler<T>是生产者,Processor<T>是消费者。
有时epoll/kqueue会一次产生多个事件,如果先前队列为空,那么需要通知Processor<T>可以处理事件,因为emptySignal.notify只能一次唤醒一个线程,为了更加高效的处理事件,应该使用emptySignal.broadcast唤醒所有工作线程。
如果epoll/kqueue一次只产生了一个事件,并且先前队列为空,那么只需要使用emptySignal.notify唤醒一个工作线程而不应该使用emptySignal.broadcast唤醒工作线程,因为只有一个事件,所以只有一个线程会处理事件,而其他线程会空转一次消耗资源。
如果epoll/kqueue产生了事件,但是队列不为空,那么不需要唤醒工作线程的操作,因为队列不为空的时候,没有任何工作线程处于等待状态。
代码参考eptcpsrv.hpp/Queue<T>
 
Processor<T>Windows基本一样,Processor<T>Queue<T>取出事件,然后根据clt->event事件类型调用响应的事件通知函数。
 
Client::recv也是一个请求接收数据的过程,并不实际接收数据,当有数据到来时,Processor<T>的工作线程负责接收数据,然后调用Client::OnRecv通知响应的连接对象。
Cleint::send是一个同步阻塞函数,等待数据真正发送完成后再返回。
Client::closeWindows类似,只是调用shutdown来触发断开消息,然后处理流程跟Windows一致。




转自:http://blog.chinaunix.net/u1/52224/showart_425449.html

posted on 2009-07-27 22:09 石头@ 阅读(1510) 评论(0)  编辑  收藏 所属分类: Tcp/Ip


只有注册用户登录后才能发表评论。


网站导航: