使用ZooKeeper为CXF或其他服务动态更新服务器信息




ZooKeeper是一个优秀的协调服务, 目前是Hadoop的一个子项目. 我们可以用它来为我们的服务提供配置中心, 注册中心, 分布式同步锁, 消息队列等服务, 更多信息请浏览 http://hadoop.apache.org/zookeeper/

上篇文章中实现一个CXF的负载均衡服务, 本次我们使用ZooKeeper来为我们的服务提供动态服务器列表, 以便把客户端的调用分配到各个有效的服务上去.


动态更新服务列表有2种方法
  * 定时去获取数据, 更新我们的数据 --- 通用
  * 使用ZooKeeper的watch特性, 有服务器加入/退出时我们自动获取通知 --- 适用于有消息通知机制的


   首先我们的HelloService部分要向ZooKeeper注册
    只有注册到ZooKeeper上, 我们才知道你可以提供这个服务. 在实际环境中, 需要每个服务都需要向ZooKeeper注册 ()
   
    注册代码如下:

    private void register2Zookeeper(String address) throws Exception
    {
        ZooKeeper zk 
= new ZooKeeper(zkAddress, 3000null);

        GroupMemberCenter gmc 
= new GroupMemberCenter();
        gmc.setZooKeeper(zk);

        gmc.createAndSetGroup(groupName);
        gmc.joinGroupByDefine(address);

        System.out.println(
"register service to zookeeper: " + address);
    }





    GroupMemberCenter是一个辅助类, 代码如下:


    /**
     * Dynamic member center.
     * <p/>
     * The member maybe leave or dead dynamiclly.
     *
     *
     * 
@author: Felix Zhang  Date: 2010-9-30 17:58:16
     
*/
    
public class GroupMemberCenter
    {
        
public static final String ESCAPE_PREFIX = "|||";

        
private static final Log log = LogFactory.getLog(GroupMemberCenter.class);
        
private static final List<String> EMPTY_MEMBERS = new ArrayList<String>(0);

        
private ZooKeeper zk;
        
private String group = "";

        
private String me = "";

        
public void setZooKeeper(ZooKeeper zk)
        {
            
this.zk = zk;
        }

        
public void setGroup(String groupName)
        {
            
if (groupName != null && groupName.length() > 0)
            {
                
if (!groupName.startsWith("/"))
                {
                    groupName 
= "/" + groupName;
                }

                
this.group = groupName;
            }
        }

        
public boolean createAndSetGroup(String groupName)
        {
            
boolean result = createGroup(groupName);

            
if (result)
            {
                setGroup(groupName);
            }

            
return result;
        }

        
public boolean createGroup(String groupName)
        {
            
assert groupName != null;

            
if (!groupName.startsWith("/"))
            {
                groupName 
= "/" + groupName;
            }

            
try
            {
                Stat s 
= zk.exists(groupName, false);

                
if (s == null)
                {
                    zk.create(groupName, 
new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            }
            
catch (Exception e)
            {
                log.error(
"create group error: " + groupName, e);
                
return false;
            }
            
return true;
        }

        
protected String buildName(String name)
        {
            
return group + "/" + name;
        }

        
public boolean joinGroup()
        {
            
return joinGroup(null);
        }

        
public boolean joinGroup(Integer port)
        {
            
try
            {
                
//use ipaddress as default, if you will use different ipaddress, you need joinGroup(yourip)
                me = InetAddress.getLocalHost().getHostAddress();
                
return joinGroupByDefine(me + ":" + port);
            }
            
catch (Exception e)
            {
                log.error(
"join group error", e);
                
return false;
            }
        }

        
public boolean joinGroupByDefine(String userdefine)
        {
            
assert userdefine != null;
            
assert userdefine.length() > 0;

            
try
            {
                me 
= userdefine;
                
if (me.contains("[host]"))
                {
                    String host 
= InetAddress.getLocalHost().getHostAddress();
                    me 
= me.replaceFirst("\\[host\\]", host);
                }

                
//if contains "/", how to deal?      --- maybe we need more format in future
                me = ESCAPE_PREFIX + URLEncoder.encode(me, "UTF-8");

                zk.create(buildName(me), 
new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            }
            
catch (Exception e)
            {
                log.error(
"join group error: " + me, e);
                
return false;
            }

            
return true;
        }

        
public void leaveGroup()
        {
            
try
            {
                zk.delete(buildName(me), 
0);
            }
            
catch (Exception e)
            {
                log.error(
"leave group error: " + me, e);
            }
        }

        
public List<String> fetchGroupMembers()
        {
            
return fetchGroupMembers(group, null);
        }

        
public List<String> fetchGroupMembers(String groupName)
        {
            
return fetchGroupMembers(groupName, null);
        }

        
public List<String> fetchGroupMembers(String groupName, Watcher watcher)
        {
            
if (groupName != null && groupName.length() > 0)
            {
                
if (!groupName.startsWith("/"))
                {
                    groupName 
= "/" + groupName;
                }
            }
            
else
            {
                
return EMPTY_MEMBERS;
            }

            
try
            {
                List
<String> childlist;
                
if(watcher == null)
                {
                    childlist 
= zk.getChildren(groupName, false);
                }
                
else
                {
                    childlist 
= zk.getChildren(groupName, watcher);
                }

                List
<String> lastresult = new ArrayList<String>();
                
for (String item : childlist)
                {
                    
if (item.startsWith(ESCAPE_PREFIX))
                    {
                        lastresult.add(URLDecoder.decode(item, 
"UTF-8").substring(3));
                    }
                    
else
                    {
                        lastresult.add(item);
                    }
                }

                
return lastresult;
            }
            
catch (Exception e)
            {
                log.error(
"fetch group members error", e);
                
return EMPTY_MEMBERS;
            }
        }
    }




    GroupMemberCenter主要是把用户的address信息做一下转义然后在ZooKeeper中创建一个节点, 注册时使用 CreateMode.EPHEMERAL 模式, 也就是类似心跳监测, 如果服务挂掉, 那么ZooKeeper会自动删除此节点.


    为了方便测试, 编写3个启动服务的程序来模拟多台机器, 启动的都是Hello服务, 只是端口不一样而已:

    public class HelloServiceServer5Zookeeper1 {
        
public static void main(String[] args) throws Exception {
            
new HelloServicePublisher5Zookeeper().start("http://localhost:8081/service/Hello"new HelloFirstImpl());
        }
    }



    其他2个请自己看源码包.

   
    下面我们来准备Client, 代码和上篇文章中的一样, 首先是一个XML:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jaxws
="http://cxf.apache.org/jaxws"
       xmlns:clustering
="http://cxf.apache.org/clustering"
       xmlns:util
="http://www.springframework.org/schema/util"
       xsi:schemaLocation
="
http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"
>


    
<bean id="loadBalanceStrategy" class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
        
<property name="removeFailedEndpoint" value="true" />
    
</bean>

    
<bean id="loadBalanceFeature" class="org.javascud.extensions.cxf.LoadBalanceFeature">
        
<property name="strategy" ref="loadBalanceStrategy" />
    
</bean>


    
<jaxws:client name="helloClient"
                  serviceClass
="org.javascud.extensions.cxf.service.Hello"            >
        
<jaxws:features>
            
<ref bean="loadBalanceFeature" />
        
</jaxws:features>
    
</jaxws:client>

    
<bean id="zooKeeper" class="org.apache.zookeeper.ZooKeeper">
        
<constructor-arg index="0" value="127.0.0.1:2181" />
        
<constructor-arg index="1" value="3000" />
        
<constructor-arg index="2" ><null/></constructor-arg>
    
</bean>
</beans>


    XML没有写任何服务的网址, 后面的程序负责更新服务列表. 此XML定义了一个ZooKeeper客户端, 你可以根据自己的实际情况修改, 例如ZooKeeper本身也可以是负载均衡的 (一般为3台服务器, 方便投票).

   
    调用的Java代码如下:



        ClassPathXmlApplicationContext context
                
= new ClassPathXmlApplicationContext(new String[]
                {
"org/javascud/extensions/cxf/zookeeper/client/loadbalance_fail_zookeeper.xml"});
        
final Hello client = (Hello) context.getBean("helloClient");

        
final AbstractLoadBalanceStrategy strategy = (AbstractLoadBalanceStrategy) context.getBean("loadBalanceStrategy");

        Client myclient 
= ClientProxy.getClient(client);
        String address 
= myclient.getEndpoint().getEndpointInfo().getAddress();
        System.out.println(address);


        ZooKeeper zk 
= (ZooKeeper) context.getBean("zooKeeper");
   
    
//使用定时刷新的方式更新服务列表: 实际代码中可以写一个单独的类来调用
        ServiceEndpointsFetcher fetcher = new ServiceEndpointsFetcher();
        fetcher.setStrategy(strategy);
        fetcher.setZooKeeper(zk);
        fetcher.setGroupName(groupName);
        fetcher.start();

    
//调用服务
        for (int i = 1; i <= 1000; i++) {
            String result1 
= client.sayHello("Felix" + i);
            System.out.println(
"Call " + i + "" + result1);

            
int left = strategy.getAlternateAddresses(null).size();
            System.out.println(
"================== left " + left + " ===========================");

            Thread.sleep(
100);
        }


    查看上面的代码可以发现, 我们使用了ServiceEndpointsFetcher来刷新, 间隔固定的时间去获取最新的服务列表.


    我们还可以采用观察者方式来更新服务列表:

/**
 * watcher service from zookeeper.
 *
 * 
@author Felix Zhang   Date:2010-10-16 01:13
 
*/
public class ServiceEndpointsWatcher extends ZooKeeperChildrenWatcher {

    
private AbstractLoadBalanceStrategy strategy;

    
public void setStrategy(AbstractLoadBalanceStrategy strategy) {
        
this.strategy = strategy;
    }

    @Override
    
protected void updateData(List<String> members) {
        strategy.setAlternateAddresses(members);
    }
}


    ZooKeeperChildrenWatcher是一个父类, 调用GroupMemberCenter的代码来监测ZooKeeper上的对应节点:

/**
 * a Watcher for monitor zookeeper by getChildren
 *
 * 
@author Felix Zhang   Date:2010-10-16 14:39
 
*/
public abstract class ZooKeeperChildrenWatcher implements Watcher {
    
private ZooKeeper zooKeeper;
    
private String groupName;
    
private GroupMemberCenter gmc = null;

    
public void setZooKeeper(ZooKeeper zooKeeper) {
        
this.zooKeeper = zooKeeper;
    }

    
public void setGroupName(String groupName) {
        
this.groupName = groupName;
    }

    @Override
    
public void process(WatchedEvent event) {
        fetchAndUpdate();
    }

    
private void fetchAndUpdate() {
        
//get children and register watcher again
        List<String> members = gmc.fetchGroupMembers(groupName, this);

        updateData(members);
    }

    
protected abstract void updateData(List<String> members);

    
public void init() {
        
if (zooKeeper != null) {
            gmc 
= new GroupMemberCenter();
            gmc.setZooKeeper(zooKeeper);

            fetchAndUpdate();
        }
    }
}
   

    调用ServiceEndpointsWatcher的代码是在Spring的XML中, 当然也可以在单独程序中调用:

   
    <bean id="serviceEndpointsWatcher"
          class
="org.javascud.extensions.cxf.zookeeper.ServiceEndpointsWatcher"
            init-method
="init">
        
<property name="strategy" ref="loadBalanceStrategy" />
        
<property name="zooKeeper" ref="zooKeeper" />
        
<property name="groupName" value="helloservice" />
    
</bean>


   
    ok, 下面我们启动ZooKeeper, 在2181端口. 然后其次启动三个HelloService: HelloServiceServer5Zookeeper1, HelloServiceServer5Zookeeper2, HelloServiceServer5Zookeeper3, 它们分别监测在8081, 8082, 8083端口, 并且会向ZooKeeper注册, 你查看用ZooKeeper的客户端查看 ls /helloservice, 应该可以看到三个节点.

    然后我们运行客户端代码 HelloClient5Zookeeper, 在客户端运行的过程中, 我们可以终止/启动HelloService, 就可以看到我们的程序会动态地访问不同的HelloService, 达到了负载均衡的目的.






    注: ServiceEndpointsWatcher 或ServiceEndpointsFetcher 一定现行运行, 否则调用服务的部分会抛出异常, 因为没有可用的服务地址.


代码打包下载: http://cnscud.googlecode.com/files/extensions-20101016.zip
SVN 源码位置: http://cnscud.googlecode.com/svn/trunk/extensions/ 


转载请注明作者和出处 http://scud.blogjava.net
   


posted on 2010-10-16 19:37 Scud(飞云小侠) 阅读(4725) 评论(1)  编辑  收藏 所属分类: SOA

评论

# re: 使用ZooKeeper为CXF或其他服务动态更新服务器信息 2012-09-07 15:32 zdonking

吼吼,没想到飞哥 业余还研究代码  回复  更多评论   


只有注册用户登录后才能发表评论。


网站导航:
 
<2010年10月>
262728293012
3456789
10111213141516
17181920212223
24252627282930
31123456

导航

统计

公告

文章发布许可
创造共用协议:署名,非商业,保持一致

我的邮件
cnscud # gmail


常用链接

留言簿(14)

随笔分类(110)

随笔档案(102)

相册

友情链接

技术网站

搜索

积分与排名

最新评论

阅读排行榜

评论排行榜