当柳上原的风吹向天际的时候...

真正的快乐来源于创造

  BlogJava :: 首页 :: 联系 :: 聚合  :: 管理
  368 Posts :: 1 Stories :: 201 Comments :: 0 Trackbacks
注:以下Cache未加时效处理,是比较简陋的Cache方案。

AbstractCacheMap基类,用来定义和限制子类的操作:
package cachemap.base;

/**
 * Parent class of InnerCacheMap & OutterCacheMap
 * 
 * 
@author heyang
 * @time Sep 23, 2011,4:00:45 PM
 
*/
public abstract class AbstractCacheMap {
    
// Read-Write lock,for protecting the cacheMap in the Multi-thread environment
    private ReadWriteLock lock;
    
    
/**
     * Contructor
     
*/
    
public AbstractCacheMap() {
        lock 
= new ReadWriteLock();
    }
    
    
/**
     * Put key-value pair into cacheMap
     * It can be called by any class
     * 
@param key
     * 
@param value
     * 
@throws Exception
     
*/
    
public void writeData(String key,byte[] value) throws Exception{
        
try {
            lock.writeLock();
            set(key,value);
        } 
finally {
            lock.writeUnlock();
        }
    }
    
    
/**
     * Put key-value pair into cacheMap,force child-class to implement.
     * It only can be called by child-class
     * 
@param key
     * 
@param value
     
*/
    
protected abstract void set(String key,byte[] value) throws Exception;

    
/**
     * Get value by it's key
     * It can be called by any class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
public byte[] readData(String key) throws Exception{
        
try {
            lock.readLock();
            
return get(key);
        } 
finally {
            lock.readUnlock();
        }
    }
    
    
/**
     * Get value by it's key,force child-class to implement.
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract byte[] get(String key) throws Exception;
    
    
/**
     * Judge the existence of a key
     * It can be called by any class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
public boolean containsKey(String key) throws Exception{
        
try {
            lock.readLock();
            
return contains(key);
        } 
finally {
            lock.readUnlock();
        }
    }
    
    
/**
     * Judge the existence of a key,force child-class to implement.
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract boolean contains(String key) throws Exception;
    
    
/**
     * Remove a key-value pair from cacheMap by it's key
     * It can be called by any class
     * 
     * 
@param key
     * 
@throws Exception
     
*/
    
public void removeData(String key) throws Exception{
        
try {
            lock.writeLock();
            remove(key);
        } 
finally {
            lock.writeUnlock();
        }
    }
    
    
/**
     * Remove a key-value pair from cacheMap by it's key
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract void remove(String key) throws Exception;
    
    
/**
     * Remove all data in the cacheMap
     * It can be called by any class
     * 
     * 
@throws Exception
     
*/
    
public void removeAllData() throws Exception{
        
try {
            lock.writeLock();
            removeAll();
        } 
finally {
            lock.writeUnlock();
        }
    }
    
    
/**
     * Remove all data in the cacheMap
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract void removeAll() throws Exception;
}
与AbstractCacheMap类配合使用,用来防止读写线程冲突和线程拥挤的读写锁类:
package cachemap.base;


/**
 * Parent class of InnerCacheMap & OutterCacheMap
 * 
 * 
@author heyang
 * @time Sep 23, 2011,4:00:45 PM
 
*/
public abstract class AbstractCacheMap {
    
// Read-Write lock,for protecting the cacheMap in the Multi-thread environment
    private ReadWriteLock lock;
    
    
/**
     * Contructor
     
*/
    
public AbstractCacheMap() {
        lock 
= new ReadWriteLock();
    }
    
    
/**
     * Put key-value pair into cacheMap
     * It can be called by any class
     * 
@param key
     * 
@param value
     * 
@throws Exception
     
*/
    
public void writeData(String key,byte[] value) throws Exception{
        
try {
            lock.writeLock();
            set(key,value);
        } 
finally {
            lock.writeUnlock();
        }
    }
    
    
/**
     * Put key-value pair into cacheMap,force child-class to implement.
     * It only can be called by child-class
     * 
@param key
     * 
@param value
     
*/
    
protected abstract void set(String key,byte[] value) throws Exception;

    
/**
     * Get value by it's key
     * It can be called by any class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
public byte[] readData(String key) throws Exception{
        
try {
            lock.readLock();
            
return get(key);
        } 
finally {
            lock.readUnlock();
        }
    }
    
    
/**
     * Get value by it's key,force child-class to implement.
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract byte[] get(String key) throws Exception;
    
    
/**
     * Judge the existence of a key
     * It can be called by any class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
public boolean containsKey(String key) throws Exception{
        
try {
            lock.readLock();
            
return contains(key);
        } 
finally {
            lock.readUnlock();
        }
    }
    
    
/**
     * Judge the existence of a key,force child-class to implement.
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract boolean contains(String key) throws Exception;
    
    
/**
     * Remove a key-value pair from cacheMap by it's key
     * It can be called by any class
     * 
     * 
@param key
     * 
@throws Exception
     
*/
    
public void removeData(String key) throws Exception{
        
try {
            lock.writeLock();
            remove(key);
        } 
finally {
            lock.writeUnlock();
        }
    }
    
    
/**
     * Remove a key-value pair from cacheMap by it's key
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract void remove(String key) throws Exception;
    
    
/**
     * Remove all data in the cacheMap
     * It can be called by any class
     * 
     * 
@throws Exception
     
*/
    
public void removeAllData() throws Exception{
        
try {
            lock.writeLock();
            removeAll();
        } 
finally {
            lock.writeUnlock();
        }
    }
    
    
/**
     * Remove all data in the cacheMap
     * It only can be called by child-class
     * 
     * 
@param key
     * 
@return
     * 
@throws Exception
     
*/
    
protected abstract void removeAll() throws Exception;
}

用来往CacheMap中异步添加值的CacheMapSetter类:
package cachemap.setter;

import cachemap.base.AbstractCacheMap;


/**
 * CacheMapSetter
 * It's use is to set a key-value pair into cacheMap
 * 
 * 
@author heyang
 * @time Sep 26, 2011,10:11:36 AM
 
*/
public final class CacheMapSetter implements Runnable{
    
// The reference to the cacheMap
    private AbstractCacheMap cacheMap;
    
private String key;
    
private byte[] value;
    
    
/**
     * Constuctor
     * 
@param cacheMap
     * 
@param key
     * 
@param value
     
*/
    
public CacheMapSetter(AbstractCacheMap cacheMap,String key,byte[] value){
        
this.cacheMap=cacheMap;
        
this.key=key;
        
this.value=value;
        
        
new Thread(this).start();
    }

    @Override
    
public void run(){
        
try{
            cacheMap.writeData(key, value);        
        }
        
catch(Exception ex){
            ex.printStackTrace();
        }
    }
}
AbstractCacheMap的实际子类InnerCacheMap,存储空间使用的是内置的HashMap:
package cachemap;

import java.util.HashMap;
import java.util.Map;

import cachemap.base.AbstractCacheMap;

/**
 * CacheMap used local HashMap
 * 
 * 
@author heyang
 * @time Sep 23, 2011,3:48:17 PM
 
*/
public class InnerCacheMap extends AbstractCacheMap{
    
// essential storage
    private Map<String,byte[]> map;
    
    
/**
     * Contructor
     
*/
    
public InnerCacheMap(){
        
super();
        map
=new HashMap<String,byte[]>();
    }

    @Override
    
protected byte[] get(String key) throws Exception {
        
return map.get(key);
    }

    @Override
    
protected void set(String key, byte[] value) throws Exception {
        map.put(key, value);
    }

    @Override
    
protected boolean contains(String key) throws Exception {
        
return map.containsKey(key);
    }

    @Override
    
protected void remove(String key) throws Exception {
        map.remove(key);
    }

    @Override
    
protected void removeAll() throws Exception {
        map.clear();
    }
}
AbstractCacheMap的子类WxsCacheMap,使用的是外部的WebSphere Extreme Scale作为存储空间:
package cachemap;

import cachemap.base.AbstractCacheMap;

import com.devwebsphere.wxsutils.WXSMap;
import com.devwebsphere.wxsutils.WXSUtils;

/**
 * CacheMap powered by WebSphere eXtreme Scale
 * 
 * 
@author heyang
 * @time Sep 23, 2011,3:47:54 PM
 
*/
public class WxsCacheMap extends AbstractCacheMap{
    
// essential storage
    private WXSMap<String, byte[]> wxsMap;
    
    
/**
     * Contructor
     
*/
    
public WxsCacheMap(String host,String grid,String businessObjectName){
        
super();
        
        
try{
            WXSUtils wxsUtils 
= WxsConnection.getWxsConnection(host,grid);
            wxsMap
=getMap(wxsUtils,businessObjectName);
        }
catch(Exception ex){
            ex.printStackTrace();
        }
    }
    
    
/**
     * Get map from WXSUtils
     * 
@param wxsUtils
     * 
@param strBusinessObject
     * 
@return
     
*/
    
private WXSMap<String, byte[]> getMap(WXSUtils wxsUtils, String strBusinessObject) {
        
if (strBusinessObject.equalsIgnoreCase("CUSTOMER")){
            
return wxsUtils.getCache("BO_CUSTOMER");
        }
else if (strBusinessObject.equalsIgnoreCase("VEHICLE")){
            
return wxsUtils.getCache("BO_VEHICLE");
        }
else if (strBusinessObject.equalsIgnoreCase("NAVIGATION")){
            
return  wxsUtils.getCache("NAV_NAVIGATION");
        }
else{
            
return null;
        }
    }
    
    
    @Override
    
protected byte[] get(String key) throws Exception {
        
return wxsMap.get(key);
    }

    @Override
    
protected void set(String key, byte[] value) throws Exception {
        wxsMap.put(key, value);
    }

    @Override
    
protected boolean contains(String key) throws Exception {
        
return wxsMap.contains(key);
    }

    @Override
    
protected void remove(String key) throws Exception {
        wxsMap.remove(key);
    }

    @Override
    
protected void removeAll() throws Exception {
        wxsMap.clear();
    }
}
与WxsCacheMap配合使用的WxsConnection类:
package cachemap;

import com.devwebsphere.wxsutils.WXSUtils;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
import com.ibm.websphere.objectgrid.TargetNotAvailableException;

public class WxsConnection {
    
static WXSUtils utils;
    
static ObjectGrid grid;

    
/**
     * 
@param strCatalogServerEndPoint: Catalog Server End Point example: localhost:2809
     * 
@param strGrid: Grid name
     * 
@return
     
*/
    
public static synchronized WXSUtils getWxsConnection(String strCatalogServerEndPoint,String strGrid) {
        
if (utils == null) {
            
            grid 
= WXSUtils.connectClient(strCatalogServerEndPoint,strGrid);  
            utils 
= new WXSUtils(grid);
        }
        
return utils;
    }

    
public static void handleException(ObjectGridRuntimeException e) {
        Throwable ne 
= e.getCause();
        
if (ne instanceof TargetNotAvailableException) {
            
// bad target (maybe ip changed)
            WXSUtils oldUtils;
            
synchronized (WxsConnection.class) {
                oldUtils 
= utils;
                utils 
= null;
            }
            oldUtils.getObjectGrid().destroy();
        }
    }
}

用来从外界得到CacheMap的CacheFactory类:
package cachemap;

import cachemap.base.AbstractCacheMap;

/**
 * CacheMapFacory
 * It' used to get the actual cacheMap
 * 
 * 
@author heyang
 * @time Sep 26, 2011,10:41:39 AM
 
*/
public final class CacheMapFacory{
    
/**
     * getInnerCacheMap
     * 
@return
     * 
@throws Exception
     
*/
    
public static AbstractCacheMap getInnerCacheMap() throws Exception{
        
return new InnerCacheMap();
    }
    
    
/**
     * getWxsCacheMap
     * 
@return
     * 
@throws Exception
     
*/
    
public static AbstractCacheMap getWxsCacheMap() throws Exception{
        
return new WxsCacheMap("host","grid","VEHICLE");
    }
}

JavaCompute节点中使用Cache的Java代码:
import cachemap.CacheMapFacory;
import cachemap.base.AbstractCacheMap;
import cachemap.setter.CacheMapSetter;

import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;

/**
 * subflow_JavaCompute class
 * 
 * 
@author heyang
 * @time Sep 26, 2011,10:14:34 AM
 
*/
public class subflow_JavaCompute extends MbJavaComputeNode {
    
private static AbstractCacheMap cacheMap;
    
static{
        
try {
            cacheMap
=CacheMapFacory.getInnerCacheMap();
        } 
catch (Exception e) {
            e.printStackTrace();
        }
    }

    
public void evaluate(MbMessageAssembly inAssembly) throws MbException {
        
/****************************
         * 1.out&alt
         **************************
*/
        MbOutputTerminal out 
= getOutputTerminal("out");
        
//MbOutputTerminal alt = getOutputTerminal("alternate");
        
        
/****************************
         * 2.inMsg& outMsg
         **************************
*/
        MbMessage inMsg 
= inAssembly.getMessage();
        MbMessage outMsg 
= new MbMessage();
        
        
/****************************
         * 3.input&output environment
         **************************
*/
        MbMessage inputEnv
=inAssembly.getLocalEnvironment();
        MbMessage outputEnv
=new MbMessage(inAssembly.getLocalEnvironment());
        
        
/****************************
         * 4.outAssembly
         **************************
*/
        MbMessageAssembly outAssembly
=new MbMessageAssembly(inAssembly,outputEnv,inAssembly.getExceptionList(),outMsg);
        
        
try{
            
// Copy Headers
            copyMessageHeaders(inMsg,outMsg);
            
            
// Get key
            String key=getKey(inputEnv);
            
            
// Get operation
            String operation=getOperation(inputEnv);
            
            
            
if("READ".equalsIgnoreCase(operation)){
                
if(cacheMap.containsKey(key)){
                    
byte[] value=cacheMap.readData(key);
                    
                    
// TODO:
                    MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                    MbElement elementValue 
= elementOutputVariables.getFirstElementByPath("Value");
                    
if(elementValue==null){
                        elementValue 
= elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Value",    "");
                    }
                    
                    elementValue.setValue(value);
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    
"Status",    "SUCCESS");
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    
"Message",    "CACHE Record Found");    
                }
else{
                    
// TODO:
                    MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    
"Status",    "FAILURE");
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, 
"Message",    "No Record Found");    
                }
            }
else if("WRITE".equalsIgnoreCase(operation)){
                
if(cacheMap.containsKey(key)){
                    cacheMap.removeData(key);
                }    
                
                
byte[] value=getValue(outputEnv);
                
new CacheMapSetter(cacheMap,key, value);
                
                
// TODO:
                MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                MbElement elementValue 
= elementOutputVariables.getFirstElementByPath("Value");
                
if(elementValue==null){
                    elementValue 
= elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Value",    "");
                }
                elementValue.detach();
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    
"Status",    "SUCCESS");
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, 
"Message",    "Record Added to Cache");
                
            }
else if("REMOVEALL".equalsIgnoreCase(operation)){
                cacheMap.removeAllData();
                
                
// TODO:
                MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                MbElement elementValue 
= elementOutputVariables.getFirstElementByPath("Value");
                
if(elementValue==null){
                    elementValue 
= elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Value",    "");
                }
                elementValue.detach();
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    
"Status",    "SUCCESS");
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, 
"Message",    "Rmoved all data from Cache");
            }
else{
                
throw new Exception("Unknown Operation:"+operation);
            }
            
            
// Send Back
            out.propagate(outAssembly);
        }
        
catch(Exception ex){
            ex.printStackTrace();
        }
    }
    
    
//private 
    
    
/**
     * get operation
     * get operation from input environment
     * 
     * 
@param inputEnv
     * 
@return
     * 
@throws Exception
     
*/
    
private String getOperation(MbMessage inputEnv) throws Exception{
        MbElement elementInputEnvironment 
= inputEnv.getRootElement();
        MbElement elementOperation 
= elementInputEnvironment.getFirstElementByPath("Variables/Operation");
        String strOperation 
= elementOperation.getValueAsString();
        
        
return strOperation;
    }
    
    
/**
     * getKey
     * get key from input environment
     * 
     * 
@param inputEnv
     * 
@return
     * 
@throws Exception
     
*/
    
private String getKey(MbMessage inputEnv) throws Exception{
        MbElement elementInputEnvironment 
= inputEnv.getRootElement();
        MbElement elementKey 
= elementInputEnvironment.getFirstElementByPath("Variables/BOKey");
        String strKey 
= elementKey.getValueAsString();
        
        
return strKey;
    }
    
    
/**
     * getValue
     * get value from output environment
     * 
     * 
@param outputEnv
     * 
@return
     * 
@throws Exception
     
*/
    
private byte[] getValue(MbMessage outputEnv) throws Exception{
        MbElement elementOutputVariables 
= outputEnv.getRootElement().getFirstElementByPath("Variables");
        MbElement elementValue 
= elementOutputVariables.getFirstElementByPath("Value");
        
byte[] byteInputObject = (byte[])elementValue.getValue();
        
        
return byteInputObject;
    }

    
/**
     * copyMessageHeaders
     * 
@param inMessage
     * 
@param outMessage
     * 
@throws MbException
     
*/
    
private void copyMessageHeaders(MbMessage inMessage, MbMessage outMessage) throws MbException {
            MbElement outRoot 
= outMessage.getRootElement();

        
// iterate though the headers starting with the first child of the root element
        MbElement header = inMessage.getRootElement().getFirstChild();
        
while (header != null && header.getNextSibling() != null// stop before the last child of the body
        {
            
// copy the header and add it to the out message
            outRoot.addAsLastChild(header.copy());
            
            
// move along to next header
            header = header.getNextSibling();
        }        
    }
}

源码下载:
http://www.blogjava.net/Files/heyang/SimpleCacheInJavaComputeNode_01.rar
posted on 2011-09-27 10:00 何杨 阅读(968) 评论(0)  编辑  收藏 所属分类: WMB

只有注册用户登录后才能发表评论。


网站导航: