import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
import java.io.File;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.DelayCloseIndexSearcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.*;
public class ConcurrentLuceneConnection
implements ILuceneConnection
{
private static Logger log;
private transient Directory directory;
private final Analyzer analyzerForIndexing;
private final ILuceneConnection.Configuration configuration;
private final Lock indexWriteLock;
private final Lock searcherCreationLock;
private final AtomicBoolean batchMode;
private DelayCloseIndexSearcher currentSearcher;
public ConcurrentLuceneConnection(Directory directory, Analyzer analyzer, ILuceneConnection.Configuration configuration)
{
indexWriteLock =
new ReentrantLock();
searcherCreationLock =
new ReentrantLock();
batchMode =
new AtomicBoolean(
false);
this.directory = directory;
analyzerForIndexing = analyzer;
this.configuration = configuration;
}
public ConcurrentLuceneConnection(File path, Analyzer analyzer, ILuceneConnection.Configuration configuration)
{
this(getDirectory(path), analyzer, configuration);
}
public int getNumDocs()
{
return ((Integer)withReader(
new ILuceneConnection.ReaderAction()
{
public Object perform(IndexReader reader)
{
return new Integer(reader.numDocs());
}
})).intValue();
}
public boolean isIndexCreated()
{
try{
return IndexReader.indexExists(directory);
}catch(IOException e)
{
throw new LuceneException(e);
}
}
public IndexSearcher leakSearcher()
{
try{
return getOpenedSearcher();
}catch(Throwable e)
{
flipCurrentSearcher();
throwLuceneException(e);
throw new IllegalStateException(
"Exception should have been thrown.");
}
}
public void optimize()
throws LuceneException
{
withWriter(
new ILuceneConnection.WriterAction()
{
public void perform(IndexWriter writer)
throws IOException
{
writer.optimize();
}
});
}
public void recreateIndexDirectory()
{
indexWriteLock.lock();
try
{
directory.close();
if(directory
instanceof FSDirectory)
directory = FSDirectory.getDirectory(((FSDirectory)directory).getFile(),
true);
else
if(directory
instanceof RAMDirectory)
directory =
new RAMDirectory();
(
new IndexWriter(directory, null,
true)).close();
}
catch(IOException e)
{
throw new LuceneException(
"Cannot create index directory.", e);
}
flipCurrentSearcher();
indexWriteLock.unlock();
}
public void close()
{
flipCurrentSearcher();
}
public void withSearch(ILuceneConnection.SearcherAction action)
throws LuceneException
{
try{
IndexSearcher searcher = getOpenedSearcher();
boolean b = action.perform(searcher);
if(!b)
throw new UnsupportedOperationException(
"Searchers are always closed. The searcherAction should always return true, we do not allow them to control closing of the searchers");
closeSearcher(searcher);
}catch(Throwable e)
{
flipCurrentSearcher();
throwLuceneException(e);
}
}
public Object withReader(ILuceneConnection.ReaderAction action)
throws LuceneException
{
try{
IndexSearcher searcher = getOpenedSearcher();
Object obj = action.perform(searcher.getIndexReader());
closeSearcher(searcher);
return obj;
}catch(Throwable e)
{
flipCurrentSearcher();
throwLuceneException(e);
return null;
}
}
public void withReaderAndDeletes(ILuceneConnection.ReaderAction action)
throws LuceneException
{
IndexReader deleter;
indexWriteLock.lock();
deleter =
null;
try
{
deleter = constructIndexDeleter();
action.perform(deleter);
flipCurrentSearcher();
}
catch(IOException e)
{
throw new LuceneException(e);
}
closeReader(deleter);
indexWriteLock.unlock();
}
public void withWriter(ILuceneConnection.WriterAction action)
throws LuceneException
{
IndexWriter writer;
indexWriteLock.lock();
writer =
null;
try
{
writer = constructIndexWriter();
action.perform(writer);
flipCurrentSearcher();
}
catch(IOException e)
{
throw new LuceneException(e);
}
closeWriter(writer);
indexWriteLock.unlock();
}
public void withDeleteAndWrites(ILuceneConnection.ReaderAction readerAction, ILuceneConnection.WriterAction writerAction)
throws LuceneException
{
indexWriteLock.lock();
withReaderAndDeletes(readerAction);
withWriter(writerAction);
indexWriteLock.unlock();
}
public void withBatchUpdate(ILuceneConnection.BatchUpdateAction action)
{
try{
indexWriteLock.lock();
batchMode.set(
true);
action.perform();
batchMode.set(
false);
indexWriteLock.unlock();
}catch(Exception e)
{
throwLuceneException(e);
}
}
public void flipCurrentSearcher()
{
if(log.isDebugEnabled())
log.debug(
"Closing current searcher..");
searcherCreationLock.lock();
if(currentSearcher !=
null)
{
try{
currentSearcher.closeWhenDone();
currentSearcher =
null;
}catch(Exception e)
{
log.error(e);
currentSearcher =
null;
}
}
searcherCreationLock.unlock();
}
private DelayCloseIndexSearcher getOpenedSearcher()
throws IOException
{
searcherCreationLock.lock();
DelayCloseIndexSearcher delaycloseindexsearcher;
if(currentSearcher ==
null)
currentSearcher =
new DelayCloseIndexSearcher(directory);
currentSearcher.open();
delaycloseindexsearcher = currentSearcher;
searcherCreationLock.unlock();
return delaycloseindexsearcher;
}
private IndexReader constructIndexDeleter()
{
try{
return IndexReader.open(directory);
}catch(IOException e)
{
throw new LuceneException(e);
}
}
private void closeReader(IndexReader reader)
{
try
{
if(reader !=
null)
{
if(log.isDebugEnabled())
log.debug(Thread.currentThread().getName() +
"## closing reader");
reader.close();
}
}
catch(IOException e)
{
log.error(
"Error closing reader. " + e, e);
}
}
private void closeSearcher(IndexSearcher searcher)
{
try
{
if(searcher !=
null)
searcher.close();
}
catch(IOException e)
{
log.error(
"Error occurred while closing searcher.", e);
}
}
private void closeWriter(IndexWriter writer)
{
try
{
if(writer !=
null)
{
if(log.isDebugEnabled())
log.debug(
"## closing writer");
writer.close();
} else
{
log.warn(
"## trying to close null writer.");
}
}
catch(IOException e)
{
log.error(
"Error closing writer. " + e, e);
}
}
private IndexWriter constructIndexWriter()
throws LuceneException
{
IndexWriter indexWriter;
if(log.isDebugEnabled())
log.debug(Thread.currentThread().getName() +
"## opening writer");
try{
indexWriter =
new IndexWriter(directory, analyzerForIndexing,
false);
if(batchMode.get())
{
indexWriter.setMaxBufferedDocs(configuration.getBatchMaxBufferedDocs());
indexWriter.setMaxMergeDocs(configuration.getBatchMaxMergeDocs());
indexWriter.setMergeFactor(configuration.getBatchMergeFactor());
} else
{
indexWriter.setMaxBufferedDocs(configuration.getInteractiveMaxBufferedDocs());
indexWriter.setMaxMergeDocs(configuration.getInteractiveMaxMergeDocs());
indexWriter.setMergeFactor(configuration.getInteractiveMergeFactor());
}
indexWriter.setMaxFieldLength(configuration.getMaxFieldLength());
return indexWriter;
}catch(IOException e)
{
throw new LuceneException(e);
}
}
private void throwLuceneException(Throwable e)
{
if(e
instanceof Error)
throw (Error)e;
if(e
instanceof RuntimeException)
throw (RuntimeException)e;
else
throw new LuceneException(e);
}
private static Directory getDirectory(File path)
{
try{
return FSDirectory.getDirectory(path,
false);
}catch(IOException e)
{
throw new LuceneException(e);
}
}
static
{
log = Logger.getLogger(ConcurrentLuceneConnection.class);
}
}
网上找到的只有这个针对lucene分布式介绍,测试还是有问题,在写索引时还是会出现LOCK问题。