注:以下Cache未加时效处理,是比较简陋的Cache方案。
AbstractCacheMap基类,用来定义和限制子类的操作:
package cachemap.base;
/**
 * Parent class of InnerCacheMap & OutterCacheMap
 * 
 * @author heyang
 * @time Sep 23, 2011,4:00:45 PM
 */
public abstract class AbstractCacheMap {
    // Read-Write lock,for protecting the cacheMap in the Multi-thread environment
    private ReadWriteLock lock;
    
    /**
     * Contructor
     */
    public AbstractCacheMap() {
        lock = new ReadWriteLock();
    }
    
    /**
     * Put key-value pair into cacheMap
     * It can be called by any class
     * @param key
     * @param value
     * @throws Exception
     */
    public void writeData(String key,byte[] value) throws Exception{
        try {
            lock.writeLock();
            set(key,value);
        } finally {
            lock.writeUnlock();
        }
    }
    
    /**
     * Put key-value pair into cacheMap,force child-class to implement.
     * It only can be called by child-class
     * @param key
     * @param value
     */
    protected abstract void set(String key,byte[] value) throws Exception;
    /**
     * Get value by it's key
     * It can be called by any class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    public byte[] readData(String key) throws Exception{
        try {
            lock.readLock();
            return get(key);
        } finally {
            lock.readUnlock();
        }
    }
    
    /**
     * Get value by it's key,force child-class to implement.
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract byte[] get(String key) throws Exception;
    
    /**
     * Judge the existence of a key
     * It can be called by any class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    public boolean containsKey(String key) throws Exception{
        try {
            lock.readLock();
            return contains(key);
        } finally {
            lock.readUnlock();
        }
    }
    
    /**
     * Judge the existence of a key,force child-class to implement.
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract boolean contains(String key) throws Exception;
    
    /**
     * Remove a key-value pair from cacheMap by it's key
     * It can be called by any class
     * 
     * @param key
     * @throws Exception
     */
    public void removeData(String key) throws Exception{
        try {
            lock.writeLock();
            remove(key);
        } finally {
            lock.writeUnlock();
        }
    }
    
    /**
     * Remove a key-value pair from cacheMap by it's key
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract void remove(String key) throws Exception;
    
    /**
     * Remove all data in the cacheMap
     * It can be called by any class
     * 
     * @throws Exception
     */
    public void removeAllData() throws Exception{
        try {
            lock.writeLock();
            removeAll();
        } finally {
            lock.writeUnlock();
        }
    }
    
    /**
     * Remove all data in the cacheMap
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract void removeAll() throws Exception;
}
与AbstractCacheMap类配合使用,用来防止读写线程冲突和线程拥挤的读写锁类:
package cachemap.base;
/**
 * Parent class of InnerCacheMap & OutterCacheMap
 * 
 * @author heyang
 * @time Sep 23, 2011,4:00:45 PM
 */
public abstract class AbstractCacheMap {
    // Read-Write lock,for protecting the cacheMap in the Multi-thread environment
    private ReadWriteLock lock;
    
    /**
     * Contructor
     */
    public AbstractCacheMap() {
        lock = new ReadWriteLock();
    }
    
    /**
     * Put key-value pair into cacheMap
     * It can be called by any class
     * @param key
     * @param value
     * @throws Exception
     */
    public void writeData(String key,byte[] value) throws Exception{
        try {
            lock.writeLock();
            set(key,value);
        } finally {
            lock.writeUnlock();
        }
    }
    
    /**
     * Put key-value pair into cacheMap,force child-class to implement.
     * It only can be called by child-class
     * @param key
     * @param value
     */
    protected abstract void set(String key,byte[] value) throws Exception;
    /**
     * Get value by it's key
     * It can be called by any class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    public byte[] readData(String key) throws Exception{
        try {
            lock.readLock();
            return get(key);
        } finally {
            lock.readUnlock();
        }
    }
    
    /**
     * Get value by it's key,force child-class to implement.
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract byte[] get(String key) throws Exception;
    
    /**
     * Judge the existence of a key
     * It can be called by any class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    public boolean containsKey(String key) throws Exception{
        try {
            lock.readLock();
            return contains(key);
        } finally {
            lock.readUnlock();
        }
    }
    
    /**
     * Judge the existence of a key,force child-class to implement.
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract boolean contains(String key) throws Exception;
    
    /**
     * Remove a key-value pair from cacheMap by it's key
     * It can be called by any class
     * 
     * @param key
     * @throws Exception
     */
    public void removeData(String key) throws Exception{
        try {
            lock.writeLock();
            remove(key);
        } finally {
            lock.writeUnlock();
        }
    }
    
    /**
     * Remove a key-value pair from cacheMap by it's key
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract void remove(String key) throws Exception;
    
    /**
     * Remove all data in the cacheMap
     * It can be called by any class
     * 
     * @throws Exception
     */
    public void removeAllData() throws Exception{
        try {
            lock.writeLock();
            removeAll();
        } finally {
            lock.writeUnlock();
        }
    }
    
    /**
     * Remove all data in the cacheMap
     * It only can be called by child-class
     * 
     * @param key
     * @return
     * @throws Exception
     */
    protected abstract void removeAll() throws Exception;
}
用来往CacheMap中异步添加值的CacheMapSetter类:
package cachemap.setter;
import cachemap.base.AbstractCacheMap;
/**
 * CacheMapSetter
 * It's use is to set a key-value pair into cacheMap
 * 
 * @author heyang
 * @time Sep 26, 2011,10:11:36 AM
 */
public final class CacheMapSetter implements Runnable{
    // The reference to the cacheMap
    private AbstractCacheMap cacheMap;
    private String key;
    private byte[] value;
    
    /**
     * Constuctor
     * @param cacheMap
     * @param key
     * @param value
     */
    public CacheMapSetter(AbstractCacheMap cacheMap,String key,byte[] value){
        this.cacheMap=cacheMap;
        this.key=key;
        this.value=value;
        
        new Thread(this).start();
    }
    @Override
    public void run(){
        try{
            cacheMap.writeData(key, value);        
        }
        catch(Exception ex){
            ex.printStackTrace();
        }
    }
}
AbstractCacheMap的实际子类InnerCacheMap,存储空间使用的是内置的HashMap:
package cachemap;
import java.util.HashMap;
import java.util.Map;
import cachemap.base.AbstractCacheMap;
/**
 * CacheMap used local HashMap
 * 
 * @author heyang
 * @time Sep 23, 2011,3:48:17 PM
 */
public class InnerCacheMap extends AbstractCacheMap{
    // essential storage
    private Map<String,byte[]> map;
    
    /**
     * Contructor
     */
    public InnerCacheMap(){
        super();
        map=new HashMap<String,byte[]>();
    }
    @Override
    protected byte[] get(String key) throws Exception {
        return map.get(key);
    }
    @Override
    protected void set(String key, byte[] value) throws Exception {
        map.put(key, value);
    }
    @Override
    protected boolean contains(String key) throws Exception {
        return map.containsKey(key);
    }
    @Override
    protected void remove(String key) throws Exception {
        map.remove(key);
    }
    @Override
    protected void removeAll() throws Exception {
        map.clear();
    }
}
AbstractCacheMap的子类WxsCacheMap,使用的是外部的WebSphere Extreme Scale作为存储空间:
package cachemap;
import cachemap.base.AbstractCacheMap;
import com.devwebsphere.wxsutils.WXSMap;
import com.devwebsphere.wxsutils.WXSUtils;
/**
 * CacheMap powered by WebSphere eXtreme Scale
 * 
 * @author heyang
 * @time Sep 23, 2011,3:47:54 PM
 */
public class WxsCacheMap extends AbstractCacheMap{
    // essential storage
    private WXSMap<String, byte[]> wxsMap;
    
    /**
     * Contructor
     */
    public WxsCacheMap(String host,String grid,String businessObjectName){
        super();
        
        try{
            WXSUtils wxsUtils = WxsConnection.getWxsConnection(host,grid);
            wxsMap=getMap(wxsUtils,businessObjectName);
        }catch(Exception ex){
            ex.printStackTrace();
        }
    }
    
    /**
     * Get map from WXSUtils
     * @param wxsUtils
     * @param strBusinessObject
     * @return
     */
    private WXSMap<String, byte[]> getMap(WXSUtils wxsUtils, String strBusinessObject) {
        if (strBusinessObject.equalsIgnoreCase("CUSTOMER")){
            return wxsUtils.getCache("BO_CUSTOMER");
        }else if (strBusinessObject.equalsIgnoreCase("VEHICLE")){
            return wxsUtils.getCache("BO_VEHICLE");
        }else if (strBusinessObject.equalsIgnoreCase("NAVIGATION")){
            return  wxsUtils.getCache("NAV_NAVIGATION");
        }else{
            return null;
        }
    }
    
    
    @Override
    protected byte[] get(String key) throws Exception {
        return wxsMap.get(key);
    }
    @Override
    protected void set(String key, byte[] value) throws Exception {
        wxsMap.put(key, value);
    }
    @Override
    protected boolean contains(String key) throws Exception {
        return wxsMap.contains(key);
    }
    @Override
    protected void remove(String key) throws Exception {
        wxsMap.remove(key);
    }
    @Override
    protected void removeAll() throws Exception {
        wxsMap.clear();
    }
}
与WxsCacheMap配合使用的WxsConnection类:
package cachemap;
import com.devwebsphere.wxsutils.WXSUtils;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
import com.ibm.websphere.objectgrid.TargetNotAvailableException;
public class WxsConnection {
    static WXSUtils utils;
    static ObjectGrid grid;
    /**
     * @param strCatalogServerEndPoint: Catalog Server End Point example: localhost:2809
     * @param strGrid: Grid name
     * @return
     */
    public static synchronized WXSUtils getWxsConnection(String strCatalogServerEndPoint,String strGrid) {
        if (utils == null) {
            
            grid = WXSUtils.connectClient(strCatalogServerEndPoint,strGrid);  
            utils = new WXSUtils(grid);
        }
        return utils;
    }
    public static void handleException(ObjectGridRuntimeException e) {
        Throwable ne = e.getCause();
        if (ne instanceof TargetNotAvailableException) {
            // bad target (maybe ip changed)
            WXSUtils oldUtils;
            synchronized (WxsConnection.class) {
                oldUtils = utils;
                utils = null;
            }
            oldUtils.getObjectGrid().destroy();
        }
    }
}
用来从外界得到CacheMap的CacheFactory类:
package cachemap;
import cachemap.base.AbstractCacheMap;
/**
 * CacheMapFacory
 * It' used to get the actual cacheMap
 * 
 * @author heyang
 * @time Sep 26, 2011,10:41:39 AM
 */
public final class CacheMapFacory{
    /**
     * getInnerCacheMap
     * @return
     * @throws Exception
     */
    public static AbstractCacheMap getInnerCacheMap() throws Exception{
        return new InnerCacheMap();
    }
    
    /**
     * getWxsCacheMap
     * @return
     * @throws Exception
     */
    public static AbstractCacheMap getWxsCacheMap() throws Exception{
        return new WxsCacheMap("host","grid","VEHICLE");
    }
}
JavaCompute节点中使用Cache的Java代码:
import cachemap.CacheMapFacory;
import cachemap.base.AbstractCacheMap;
import cachemap.setter.CacheMapSetter;
import com.ibm.broker.javacompute.MbJavaComputeNode;
import com.ibm.broker.plugin.MbElement;
import com.ibm.broker.plugin.MbException;
import com.ibm.broker.plugin.MbMessage;
import com.ibm.broker.plugin.MbMessageAssembly;
import com.ibm.broker.plugin.MbOutputTerminal;
/**
 * subflow_JavaCompute class
 * 
 * @author heyang
 * @time Sep 26, 2011,10:14:34 AM
 */
public class subflow_JavaCompute extends MbJavaComputeNode {
    private static AbstractCacheMap cacheMap;
    static{
        try {
            cacheMap=CacheMapFacory.getInnerCacheMap();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void evaluate(MbMessageAssembly inAssembly) throws MbException {
        /****************************
         * 1.out&alt
         ***************************/
        MbOutputTerminal out = getOutputTerminal("out");
        //MbOutputTerminal alt = getOutputTerminal("alternate");
        
        /****************************
         * 2.inMsg& outMsg
         ***************************/
        MbMessage inMsg = inAssembly.getMessage();
        MbMessage outMsg = new MbMessage();
        
        /****************************
         * 3.input&output environment
         ***************************/
        MbMessage inputEnv=inAssembly.getLocalEnvironment();
        MbMessage outputEnv=new MbMessage(inAssembly.getLocalEnvironment());
        
        /****************************
         * 4.outAssembly
         ***************************/
        MbMessageAssembly outAssembly=new MbMessageAssembly(inAssembly,outputEnv,inAssembly.getExceptionList(),outMsg);
        
        try{
            // Copy Headers
            copyMessageHeaders(inMsg,outMsg);
            
            // Get key
            String key=getKey(inputEnv);
            
            // Get operation
            String operation=getOperation(inputEnv);
            
            
            if("READ".equalsIgnoreCase(operation)){
                if(cacheMap.containsKey(key)){
                    byte[] value=cacheMap.readData(key);
                    
                    // TODO:
                    MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                    MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
                    if(elementValue==null){
                        elementValue = elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Value",    "");
                    }
                    
                    elementValue.setValue(value);
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Status",    "SUCCESS");
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Message",    "CACHE Record Found");    
                }else{
                    // TODO:
                    MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Status",    "FAILURE");
                    elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Message",    "No Record Found");    
                }
            }else if("WRITE".equalsIgnoreCase(operation)){
                if(cacheMap.containsKey(key)){
                    cacheMap.removeData(key);
                }    
                
                byte[] value=getValue(outputEnv);
                new CacheMapSetter(cacheMap,key, value);
                
                // TODO:
                MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
                if(elementValue==null){
                    elementValue = elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Value",    "");
                }
                elementValue.detach();
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Status",    "SUCCESS");
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Message",    "Record Added to Cache");
                
            }else if("REMOVEALL".equalsIgnoreCase(operation)){
                cacheMap.removeAllData();
                
                // TODO:
                MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
                MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
                if(elementValue==null){
                    elementValue = elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Value",    "");
                }
                elementValue.detach();
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE,    "Status",    "SUCCESS");
                elementOutputVariables.createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "Message",    "Rmoved all data from Cache");
            }else{
                throw new Exception("Unknown Operation:"+operation);
            }
            
            // Send Back
            out.propagate(outAssembly);
        }
        catch(Exception ex){
            ex.printStackTrace();
        }
    }
    
    //private 
    
    /**
     * get operation
     * get operation from input environment
     * 
     * @param inputEnv
     * @return
     * @throws Exception
     */
    private String getOperation(MbMessage inputEnv) throws Exception{
        MbElement elementInputEnvironment = inputEnv.getRootElement();
        MbElement elementOperation = elementInputEnvironment.getFirstElementByPath("Variables/Operation");
        String strOperation = elementOperation.getValueAsString();
        
        return strOperation;
    }
    
    /**
     * getKey
     * get key from input environment
     * 
     * @param inputEnv
     * @return
     * @throws Exception
     */
    private String getKey(MbMessage inputEnv) throws Exception{
        MbElement elementInputEnvironment = inputEnv.getRootElement();
        MbElement elementKey = elementInputEnvironment.getFirstElementByPath("Variables/BOKey");
        String strKey = elementKey.getValueAsString();
        
        return strKey;
    }
    
    /**
     * getValue
     * get value from output environment
     * 
     * @param outputEnv
     * @return
     * @throws Exception
     */
    private byte[] getValue(MbMessage outputEnv) throws Exception{
        MbElement elementOutputVariables = outputEnv.getRootElement().getFirstElementByPath("Variables");
        MbElement elementValue = elementOutputVariables.getFirstElementByPath("Value");
        byte[] byteInputObject = (byte[])elementValue.getValue();
        
        return byteInputObject;
    }
    /**
     * copyMessageHeaders
     * @param inMessage
     * @param outMessage
     * @throws MbException
     */
    private void copyMessageHeaders(MbMessage inMessage, MbMessage outMessage) throws MbException {
            MbElement outRoot = outMessage.getRootElement();
        // iterate though the headers starting with the first child of the root element
        MbElement header = inMessage.getRootElement().getFirstChild();
        while (header != null && header.getNextSibling() != null) // stop before the last child of the body
        {
            // copy the header and add it to the out message
            outRoot.addAsLastChild(header.copy());
            
            // move along to next header
            header = header.getNextSibling();
        }        
    }
}
源码下载:
http://www.blogjava.net/Files/heyang/SimpleCacheInJavaComputeNode_01.rar