|
Lucene example source code file (ParallelReader.java)
This example Lucene source code file (ParallelReader.java) is included in the DevDaily.com
"Java Source Code
Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.
The Lucene ParallelReader.java source code
package org.apache.lucene.index;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.FieldSelectorResult;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.util.MapBackedSet;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/** An IndexReader which reads multiple, parallel indexes. Each index added
* must have the same number of documents, but typically each contains
* different fields. Each document contains the union of the fields of all
* documents with the same document number. When searching, matches for a
* query term are from the first index added that has the field.
*
* <p>This is useful, e.g., with collections that have large fields which
* change rarely and small fields that change more frequently. The smaller
* fields may be re-indexed in a new index and both indexes may be searched
* together.
*
* <p>Warning: It is up to you to make sure all indexes
* are created and modified the same way. For example, if you add
* documents to one index, you need to add the same documents in the
* same order to the other indexes. <em>Failure to do so will result in
* undefined behavior</em>.
*/
public class ParallelReader extends IndexReader {
private List<IndexReader> readers = new ArrayList();
private List<Boolean> decrefOnClose = new ArrayList(); // remember which subreaders to decRef on close
boolean incRefReaders = false;
private SortedMap<String,IndexReader> fieldToReader = new TreeMap();
private Map<IndexReader,Collection readerToFields = new HashMap>();
private List<IndexReader> storedFieldReaders = new ArrayList();
private int maxDoc;
private int numDocs;
private boolean hasDeletions;
/** Construct a ParallelReader.
* <p>Note that all subreaders are closed if this ParallelReader is closed.
*/
public ParallelReader() throws IOException { this(true); }
/** Construct a ParallelReader.
* @param closeSubReaders indicates whether the subreaders should be closed
* when this ParallelReader is closed
*/
public ParallelReader(boolean closeSubReaders) throws IOException {
super();
this.incRefReaders = !closeSubReaders;
readerFinishedListeners = new MapBackedSet<ReaderFinishedListener>(new ConcurrentHashMap());
}
/** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder buffer = new StringBuilder("ParallelReader(");
final Iterator<IndexReader> iter = readers.iterator();
if (iter.hasNext()) {
buffer.append(iter.next());
}
while (iter.hasNext()) {
buffer.append(", ").append(iter.next());
}
buffer.append(')');
return buffer.toString();
}
/** Add an IndexReader.
* @throws IOException if there is a low-level IO error
*/
public void add(IndexReader reader) throws IOException {
ensureOpen();
add(reader, false);
}
/** Add an IndexReader whose stored fields will not be returned. This can
* accelerate search when stored fields are only needed from a subset of
* the IndexReaders.
*
* @throws IllegalArgumentException if not all indexes contain the same number
* of documents
* @throws IllegalArgumentException if not all indexes have the same value
* of {@link IndexReader#maxDoc()}
* @throws IOException if there is a low-level IO error
*/
public void add(IndexReader reader, boolean ignoreStoredFields)
throws IOException {
ensureOpen();
if (readers.size() == 0) {
this.maxDoc = reader.maxDoc();
this.numDocs = reader.numDocs();
this.hasDeletions = reader.hasDeletions();
}
if (reader.maxDoc() != maxDoc) // check compatibility
throw new IllegalArgumentException
("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
if (reader.numDocs() != numDocs)
throw new IllegalArgumentException
("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());
Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
readerToFields.put(reader, fields);
for (final String field : fields) { // update fieldToReader map
if (fieldToReader.get(field) == null)
fieldToReader.put(field, reader);
}
if (!ignoreStoredFields)
storedFieldReaders.add(reader); // add to storedFieldReaders
readers.add(reader);
if (incRefReaders) {
reader.incRef();
}
decrefOnClose.add(Boolean.valueOf(incRefReaders));
}
@Override
public synchronized Object clone() {
try {
return doReopen(true);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Tries to reopen the subreaders.
* <br>
* If one or more subreaders could be re-opened (i. e. subReader.reopen()
* returned a new instance != subReader), then a new ParallelReader instance
* is returned, otherwise this instance is returned.
* <p>
* A re-opened instance might share one or more subreaders with the old
* instance. Index modification operations result in undefined behavior
* when performed before the old instance is closed.
* (see {@link IndexReader#reopen()}).
* <p>
* If subreaders are shared, then the reference count of those
* readers is increased to ensure that the subreaders remain open
* until the last referring reader is closed.
*
* @throws CorruptIndexException if the index is corrupt
* @throws IOException if there is a low-level IO error
*/
@Override
public synchronized IndexReader reopen() throws CorruptIndexException, IOException {
return doReopen(false);
}
protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
ensureOpen();
boolean reopened = false;
List<IndexReader> newReaders = new ArrayList();
boolean success = false;
try {
for (final IndexReader oldReader : readers) {
IndexReader newReader = null;
if (doClone) {
newReader = (IndexReader) oldReader.clone();
} else {
newReader = oldReader.reopen();
}
newReaders.add(newReader);
// if at least one of the subreaders was updated we remember that
// and return a new ParallelReader
if (newReader != oldReader) {
reopened = true;
}
}
success = true;
} finally {
if (!success && reopened) {
for (int i = 0; i < newReaders.size(); i++) {
IndexReader r = newReaders.get(i);
if (r != readers.get(i)) {
try {
r.close();
} catch (IOException ignore) {
// keep going - we want to clean up as much as possible
}
}
}
}
}
if (reopened) {
List<Boolean> newDecrefOnClose = new ArrayList();
ParallelReader pr = new ParallelReader();
for (int i = 0; i < readers.size(); i++) {
IndexReader oldReader = readers.get(i);
IndexReader newReader = newReaders.get(i);
if (newReader == oldReader) {
newDecrefOnClose.add(Boolean.TRUE);
newReader.incRef();
} else {
// this is a new subreader instance, so on close() we don't
// decRef but close it
newDecrefOnClose.add(Boolean.FALSE);
}
pr.add(newReader, !storedFieldReaders.contains(oldReader));
}
pr.decrefOnClose = newDecrefOnClose;
pr.incRefReaders = incRefReaders;
return pr;
} else {
// No subreader was refreshed
return this;
}
}
@Override
public int numDocs() {
// Don't call ensureOpen() here (it could affect performance)
return numDocs;
}
@Override
public int maxDoc() {
// Don't call ensureOpen() here (it could affect performance)
return maxDoc;
}
@Override
public boolean hasDeletions() {
// Don't call ensureOpen() here (it could affect performance)
return hasDeletions;
}
// check first reader
@Override
public boolean isDeleted(int n) {
// Don't call ensureOpen() here (it could affect performance)
if (readers.size() > 0)
return readers.get(0).isDeleted(n);
return false;
}
// delete in all readers
@Override
protected void doDelete(int n) throws CorruptIndexException, IOException {
for (final IndexReader reader : readers) {
reader.deleteDocument(n);
}
hasDeletions = true;
}
// undeleteAll in all readers
@Override
protected void doUndeleteAll() throws CorruptIndexException, IOException {
for (final IndexReader reader : readers) {
reader.undeleteAll();
}
hasDeletions = false;
}
// append fields from storedFieldReaders
@Override
public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
ensureOpen();
Document result = new Document();
for (final IndexReader reader: storedFieldReaders) {
boolean include = (fieldSelector==null);
if (!include) {
Collection<String> fields = readerToFields.get(reader);
for (final String field : fields)
if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
include = true;
break;
}
}
if (include) {
List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
for (Fieldable field : fields) {
result.add(field);
}
}
}
return result;
}
// get all vectors
@Override
public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
ensureOpen();
ArrayList<TermFreqVector> results = new ArrayList();
for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {
String field = e.getKey();
IndexReader reader = e.getValue();
TermFreqVector vector = reader.getTermFreqVector(n, field);
if (vector != null)
results.add(vector);
}
return results.toArray(new TermFreqVector[results.size()]);
}
@Override
public TermFreqVector getTermFreqVector(int n, String field)
throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? null : reader.getTermFreqVector(n, field);
}
@Override
public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
if (reader != null) {
reader.getTermFreqVector(docNumber, field, mapper);
}
}
@Override
public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
ensureOpen();
for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {
String field = e.getKey();
IndexReader reader = e.getValue();
reader.getTermFreqVector(docNumber, field, mapper);
}
}
@Override
public boolean hasNorms(String field) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? false : reader.hasNorms(field);
}
@Override
public byte[] norms(String field) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
return reader==null ? null : reader.norms(field);
}
@Override
public void norms(String field, byte[] result, int offset)
throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(field);
if (reader!=null)
reader.norms(field, result, offset);
}
@Override
protected void doSetNorm(int n, String field, byte value)
throws CorruptIndexException, IOException {
IndexReader reader = fieldToReader.get(field);
if (reader!=null)
reader.doSetNorm(n, field, value);
}
@Override
public TermEnum terms() throws IOException {
ensureOpen();
return new ParallelTermEnum();
}
@Override
public TermEnum terms(Term term) throws IOException {
ensureOpen();
return new ParallelTermEnum(term);
}
@Override
public int docFreq(Term term) throws IOException {
ensureOpen();
IndexReader reader = fieldToReader.get(term.field());
return reader==null ? 0 : reader.docFreq(term);
}
@Override
public TermDocs termDocs(Term term) throws IOException {
ensureOpen();
return new ParallelTermDocs(term);
}
@Override
public TermDocs termDocs() throws IOException {
ensureOpen();
return new ParallelTermDocs();
}
@Override
public TermPositions termPositions(Term term) throws IOException {
ensureOpen();
return new ParallelTermPositions(term);
}
@Override
public TermPositions termPositions() throws IOException {
ensureOpen();
return new ParallelTermPositions();
}
/**
* Checks recursively if all subreaders are up to date.
*/
@Override
public boolean isCurrent() throws CorruptIndexException, IOException {
for (final IndexReader reader : readers) {
if (!reader.isCurrent()) {
return false;
}
}
// all subreaders are up to date
return true;
}
/**
* Checks recursively if all subindexes are optimized
*/
@Override
public boolean isOptimized() {
for (final IndexReader reader : readers) {
if (!reader.isOptimized()) {
return false;
}
}
// all subindexes are optimized
return true;
}
/** Not implemented.
* @throws UnsupportedOperationException
*/
@Override
public long getVersion() {
throw new UnsupportedOperationException("ParallelReader does not support this method.");
}
// for testing
IndexReader[] getSubReaders() {
return readers.toArray(new IndexReader[readers.size()]);
}
@Override
protected void doCommit(Map<String,String> commitUserData) throws IOException {
for (final IndexReader reader : readers)
reader.commit(commitUserData);
}
@Override
protected synchronized void doClose() throws IOException {
for (int i = 0; i < readers.size(); i++) {
if (decrefOnClose.get(i).booleanValue()) {
readers.get(i).decRef();
} else {
readers.get(i).close();
}
}
}
@Override
public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
ensureOpen();
Set<String> fieldSet = new HashSet();
for (final IndexReader reader : readers) {
Collection<String> names = reader.getFieldNames(fieldNames);
fieldSet.addAll(names);
}
return fieldSet;
}
private class ParallelTermEnum extends TermEnum {
private String field;
private Iterator<String> fieldIterator;
private TermEnum termEnum;
public ParallelTermEnum() throws IOException {
try {
field = fieldToReader.firstKey();
} catch(NoSuchElementException e) {
// No fields, so keep field == null, termEnum == null
return;
}
if (field != null)
termEnum = fieldToReader.get(field).terms();
}
public ParallelTermEnum(Term term) throws IOException {
field = term.field();
IndexReader reader = fieldToReader.get(field);
if (reader!=null)
termEnum = reader.terms(term);
}
@Override
public boolean next() throws IOException {
if (termEnum==null)
return false;
// another term in this field?
if (termEnum.next() && termEnum.term().field()==field)
return true; // yes, keep going
termEnum.close(); // close old termEnum
// find the next field with terms, if any
if (fieldIterator==null) {
fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
fieldIterator.next(); // Skip field to get next one
}
while (fieldIterator.hasNext()) {
field = fieldIterator.next();
termEnum = fieldToReader.get(field).terms(new Term(field));
Term term = termEnum.term();
if (term!=null && term.field()==field)
return true;
else
termEnum.close();
}
return false; // no more fields
}
@Override
public Term term() {
if (termEnum==null)
return null;
return termEnum.term();
}
@Override
public int docFreq() {
if (termEnum==null)
return 0;
return termEnum.docFreq();
}
@Override
public void close() throws IOException {
if (termEnum!=null)
termEnum.close();
}
}
// wrap a TermDocs in order to support seek(Term)
private class ParallelTermDocs implements TermDocs {
protected TermDocs termDocs;
public ParallelTermDocs() {}
public ParallelTermDocs(Term term) throws IOException {
if (term == null)
termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
else
seek(term);
}
public int doc() { return termDocs.doc(); }
public int freq() { return termDocs.freq(); }
public void seek(Term term) throws IOException {
IndexReader reader = fieldToReader.get(term.field());
termDocs = reader!=null ? reader.termDocs(term) : null;
}
public void seek(TermEnum termEnum) throws IOException {
seek(termEnum.term());
}
public boolean next() throws IOException {
if (termDocs==null)
return false;
return termDocs.next();
}
public int read(final int[] docs, final int[] freqs) throws IOException {
if (termDocs==null)
return 0;
return termDocs.read(docs, freqs);
}
public boolean skipTo(int target) throws IOException {
if (termDocs==null)
return false;
return termDocs.skipTo(target);
}
public void close() throws IOException {
if (termDocs!=null)
termDocs.close();
}
}
private class ParallelTermPositions
extends ParallelTermDocs implements TermPositions {
public ParallelTermPositions() {}
public ParallelTermPositions(Term term) throws IOException { seek(term); }
@Override
public void seek(Term term) throws IOException {
IndexReader reader = fieldToReader.get(term.field());
termDocs = reader!=null ? reader.termPositions(term) : null;
}
public int nextPosition() throws IOException {
// It is an error to call this if there is no next position, e.g. if termDocs==null
return ((TermPositions)termDocs).nextPosition();
}
public int getPayloadLength() {
return ((TermPositions)termDocs).getPayloadLength();
}
public byte[] getPayload(byte[] data, int offset) throws IOException {
return ((TermPositions)termDocs).getPayload(data, offset);
}
// TODO: Remove warning after API has been finalized
public boolean isPayloadAvailable() {
return ((TermPositions) termDocs).isPayloadAvailable();
}
}
@Override
public void addReaderFinishedListener(ReaderFinishedListener listener) {
super.addReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.addReaderFinishedListener(listener);
}
}
@Override
public void removeReaderFinishedListener(ReaderFinishedListener listener) {
super.removeReaderFinishedListener(listener);
for (IndexReader reader : readers) {
reader.removeReaderFinishedListener(listener);
}
}
}
Other Lucene examples (source code examples)
Here is a short list of links related to this Lucene ParallelReader.java source code file:
|