上善若水
In general the OO style is to use a lot of little objects with a lot of little methods that give us a lot of plug points for overriding and variation. To do is to be -Nietzsche, To bei is to do -Kant, Do be do be do -Sinatra
posts - 146,comments - 147,trackbacks - 0

前言

在《ReferenceCountSet无锁实现》中,详细介绍了如何在一个进程中实现一个无锁版本的ReferenceCountSet(或者说是在自己的代码里没有锁),但是最近遇到一个问题,如果是在分布式的环境中呢?如何实现这个引用计数?这个问题如果从头开始写,会是一个比较复杂的问题,在实际中,我们可以使用ZooKeeper设置时的version机制来实现,即CAS(Compare-And-Set)。这是一个本人在实际项目中遇到的一个问题,但是会更简单一些,因为在我们的项目中,我们使用GemFire,即已经有一个现成的分布式Map了(在Gemfire中叫做Region),所以问题简化成如果如何使用一个分布式Map实现引用计数?

实现

如果对ConcurrentMap接口比较熟悉的话,这个其实是一个比较简单的问题。在ConcurrentMap中最主要的就是引入几个CAS相关的操作:
public interface ConcurrentMap<K, V> extends Map<K, V> {
    V putIfAbsent(K key, V value);
    boolean remove(Object key, Object value);
    boolean replace(K key, V oldValue, V newValue);
    V replace(K key, V value);
}
在《ReferenceCountSet无锁实现》中我们只需要使用putIfAbsent就可以了,剩下的实现可以交给AtomicInteger提供的CAS来实现,因为它是在同一个进程中,但是如果在分布式的环境中就不能使用这个AtomicInteger,这个时候应该怎么办呢?其实这个时候我们就可以求助于replace方法了。replace方法的注释中这样描述:
    /**
     * Replaces the entry for a key only if currently mapped to a given value.
     * This is equivalent to
     * <pre>
     *   if (map.containsKey(key) &amp;&amp; map.get(key).equals(oldValue)) {
     *       map.put(key, newValue);
     *       return true;
     *   } else return false;</pre>
     * except that the action is performed atomically.
     *
     * 
@param key key with which the specified value is associated
     * 
@param oldValue value expected to be associated with the specified key
     * 
@param newValue value to be associated with the specified key
     * 
@return <tt>true</tt> if the value was replaced
     * 
@throws UnsupportedOperationException if the <tt>put</tt> operation
     *         is not supported by this map
     * 
@throws ClassCastException if the class of a specified key or value
     *         prevents it from being stored in this map
     * 
@throws NullPointerException if a specified key or value is null,
     *         and this map does not permit null keys or values
     * 
@throws IllegalArgumentException if some property of a specified key
     *         or value prevents it from being stored in this map
     
*/
    boolean replace(K key, V oldValue, V newValue);

在ConcurrentMap的value中我们只需要给Integer,然后用replace去不断的尝试,即自己实现一个CAS:
private int incrementRefCount(Object key) {
    do {
        Integer curCount = distributedMap.get(key);
        if (curCount == null) {
            curCount = distributedMap.putIfAbsent(key, new Integer(1));
            if (curCount == null) {
                return 1;
            }
        }
        
        Integer newCount = new Integer(curCount.intValue() + 1);
        if (distributedMap.replace(key, curCount, newCount)) {
            return newCount;
        }
    } while (true);
}

主要逻辑就是这样了,其实比较简单,只是之前没有遇到过这个问题,所以感觉可以记录下来。或许什么时候补充一下ZooKeeper版本的实现。
posted on 2015-04-20 20:30 DLevin 阅读(5456) 评论(0)  编辑  收藏 所属分类: Core Java

只有注册用户登录后才能发表评论。


网站导航: