裤衩的天空

Lucene并发连接实现 问题

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
import java.io.File;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.DelayCloseIndexSearcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.*;
 
public class ConcurrentLuceneConnection
implements ILuceneConnection
{
 
private static Logger log;
private transient Directory directory;
private final Analyzer analyzerForIndexing;
private final ILuceneConnection.Configuration configuration;
private final Lock indexWriteLock;
private final Lock searcherCreationLock;
private final AtomicBoolean batchMode;
private DelayCloseIndexSearcher currentSearcher;
 
public ConcurrentLuceneConnection(Directory directory, Analyzer analyzer, ILuceneConnection.Configuration configuration)
{
     indexWriteLock = new ReentrantLock();
     searcherCreationLock = new ReentrantLock();
     batchMode = new AtomicBoolean(false);
     this.directory = directory;
     analyzerForIndexing = analyzer;
     this.configuration = configuration;
}
 
public ConcurrentLuceneConnection(File path, Analyzer analyzer, ILuceneConnection.Configuration configuration)
{
     this(getDirectory(path), analyzer, configuration);
}
 
public int getNumDocs()
{
     return ((Integer)withReader(new ILuceneConnection.ReaderAction() {
 
         public Object perform(IndexReader reader)
         {
             return new Integer(reader.numDocs());
         }
 
     })).intValue();
}
 
public boolean isIndexCreated()
{
try{
     return IndexReader.indexExists(directory);
}catch(IOException e){
     throw new LuceneException(e);
}
}
 
public IndexSearcher leakSearcher()
{
try{
     return getOpenedSearcher();
}catch(Throwable e){
     flipCurrentSearcher();
     throwLuceneException(e);
     throw new IllegalStateException("Exception should have been thrown.");
}
}
 
public void optimize()
     throws LuceneException
{
     withWriter(new ILuceneConnection.WriterAction() {
 
         public void perform(IndexWriter writer)
             throws IOException
         {
             writer.optimize();
         }
     });
}
 
public void recreateIndexDirectory()
{
     indexWriteLock.lock();
     try
     {
         directory.close();
         if(directory instanceof FSDirectory)
             directory = FSDirectory.getDirectory(((FSDirectory)directory).getFile(), true);
         else
         if(directory instanceof RAMDirectory)
             directory = new RAMDirectory();
         (new IndexWriter(directory, null, true)).close();
     }
     catch(IOException e)
     {
         throw new LuceneException("Cannot create index directory.", e);
     }
     flipCurrentSearcher();
     indexWriteLock.unlock();
}
 
public void close()
{
     flipCurrentSearcher();
}
 
public void withSearch(ILuceneConnection.SearcherAction action)
     throws LuceneException
{
try{
     IndexSearcher searcher = getOpenedSearcher();
     boolean b = action.perform(searcher);
     if(!b)
         throw new UnsupportedOperationException("Searchers are always closed. The searcherAction should always return true, we do not allow them to control closing of the searchers");
     closeSearcher(searcher);
}catch(Throwable e){
     flipCurrentSearcher();
     throwLuceneException(e);
}
}
 
public Object withReader(ILuceneConnection.ReaderAction action)
     throws LuceneException
{
try{
     IndexSearcher searcher = getOpenedSearcher();
     Object obj = action.perform(searcher.getIndexReader());
     closeSearcher(searcher);
     return obj;
}catch(Throwable e){
     flipCurrentSearcher();
     throwLuceneException(e);
     return null;
}
}
 
public void withReaderAndDeletes(ILuceneConnection.ReaderAction action)
     throws LuceneException
{
     IndexReader deleter;
     indexWriteLock.lock();
     deleter = null;
     try
     {
         deleter = constructIndexDeleter();
         action.perform(deleter);
         flipCurrentSearcher();
     }
     catch(IOException e)
     {
         throw new LuceneException(e);
     }
     closeReader(deleter);
     indexWriteLock.unlock();
}
 
public void withWriter(ILuceneConnection.WriterAction action)
     throws LuceneException
{
     IndexWriter writer;
     indexWriteLock.lock();
     writer = null;
     try
     {
         writer = constructIndexWriter();
         action.perform(writer);
         flipCurrentSearcher();
     }
     catch(IOException e)
     {
         throw new LuceneException(e);
     }
     closeWriter(writer);
     indexWriteLock.unlock();
}
 
public void withDeleteAndWrites(ILuceneConnection.ReaderAction readerAction, ILuceneConnection.WriterAction writerAction)
     throws LuceneException
{
     indexWriteLock.lock();
     withReaderAndDeletes(readerAction);
     withWriter(writerAction);
     indexWriteLock.unlock();
}
 
public void withBatchUpdate(ILuceneConnection.BatchUpdateAction action)
{
try{
     indexWriteLock.lock();
     batchMode.set(true);
     action.perform();
     batchMode.set(false);
     indexWriteLock.unlock();
}catch(Exception e){
throwLuceneException(e);
}
}
 
public void flipCurrentSearcher()
{
     if(log.isDebugEnabled())
         log.debug("Closing current searcher..");
     searcherCreationLock.lock();
     if(currentSearcher != null)
     {
    try{
     currentSearcher.closeWhenDone();
     currentSearcher = null;
    }catch(Exception e){
    log.error(e);
    currentSearcher = null;
    }
     }
     searcherCreationLock.unlock();
}
 
private DelayCloseIndexSearcher getOpenedSearcher()
     throws IOException
{
     searcherCreationLock.lock();
     DelayCloseIndexSearcher delaycloseindexsearcher;
     if(currentSearcher == null)
         currentSearcher = new DelayCloseIndexSearcher(directory);
     currentSearcher.open();
     delaycloseindexsearcher = currentSearcher;
     searcherCreationLock.unlock();
     return delaycloseindexsearcher;
}
 
private IndexReader constructIndexDeleter()
{
try{
     return IndexReader.open(directory);
}catch(IOException e){
     throw new LuceneException(e);
}
}
 
private void closeReader(IndexReader reader)
{
     try
     {
         if(reader != null)
         {
             if(log.isDebugEnabled())
                 log.debug(Thread.currentThread().getName() + "## closing reader");
             reader.close();
         }
     }
     catch(IOException e)
     {
         log.error("Error closing reader. " + e, e);
     }
}
 
private void closeSearcher(IndexSearcher searcher)
{
     try
     {
         if(searcher != null)
             searcher.close();
     }
     catch(IOException e)
     {
         log.error("Error occurred while closing searcher.", e);
     }
}
 
private void closeWriter(IndexWriter writer)
{
     try
     {
         if(writer != null)
         {
             if(log.isDebugEnabled())
                 log.debug("## closing writer");
             writer.close();
         } else
         {
             log.warn("## trying to close null writer.");
         }
     }
     catch(IOException e)
     {
         log.error("Error closing writer. " + e, e);
     }
}
 
private IndexWriter constructIndexWriter()
     throws LuceneException
{
     IndexWriter indexWriter;
     if(log.isDebugEnabled())
         log.debug(Thread.currentThread().getName() + "## opening writer");
     try{
     indexWriter = new IndexWriter(directory, analyzerForIndexing, false);
     if(batchMode.get())
     {
         indexWriter.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs());
         indexWriter.setMaxMergeDocs(configuration.getBatchMaxMergeDocs());
         indexWriter.setMergeFactor(configuration.getBatchMergeFactor());
     } else
     {
         indexWriter.setMaxBufferedDocs(configuration.getInteractiveMaxBufferedDocs());
         indexWriter.setMaxMergeDocs(configuration.getInteractiveMaxMergeDocs());
         indexWriter.setMergeFactor(configuration.getInteractiveMergeFactor());
     }
     indexWriter.setMaxFieldLength(configuration.getMaxFieldLength());
     return indexWriter;
     }catch(IOException e){
     throw new LuceneException(e);
     }
}
 
private void throwLuceneException(Throwable e)
{
     if(e instanceof Error)
         throw (Error)e;
     if(e instanceof RuntimeException)
         throw (RuntimeException)e;
     else
         throw new LuceneException(e);
}
 
private static Directory getDirectory(File path)
{
try{
     return FSDirectory.getDirectory(path, false);
}catch(IOException e){
     throw new LuceneException(e);
}
}
 
static
{
     log = Logger.getLogger(ConcurrentLuceneConnection.class);
}
}
网上找到的只有这个针对lucene分布式介绍,测试还是有问题,在写索引时还是会出现LOCK问题。

posted on 2007-12-28 16:23 付涛 阅读(508) 评论(0)  编辑  收藏 所属分类: Lucene研究


只有注册用户登录后才能发表评论。


网站导航: