home | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Lucene example source code file (DocumentsWriter.java)

This example Lucene source code file (DocumentsWriter.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Lucene tags/keywords

analyzer, arraylist, byteblockallocator, documentswriterthreadstate, documentswriterthreadstate, docwriter, docwriter, io, ioexception, ioexception, override, override, string, text, throwable, util, waitqueue

The Lucene DocumentsWriter.java source code

package org.apache.lucene.index;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.io.PrintStream;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ThreadInterruptedException;


/**
 * This class accepts multiple added documents and directly
 * writes a single segment file.  It does this more
 * efficiently than creating a single segment per document
 * (with DocumentWriter) and doing standard merges on those
 * segments.
 *
 * Each added document is passed to the {@link DocConsumer},
 * which in turn processes the document and interacts with
 * other consumers in the indexing chain.  Certain
 * consumers, like {@link StoredFieldsWriter} and {@link
 * TermVectorsTermsWriter}, digest a document and
 * immediately write bytes to the "doc store" files (ie,
 * they do not consume RAM per document, except while they
 * are processing the document).
 *
 * Other consumers, eg {@link FreqProxTermsWriter} and
 * {@link NormsWriter}, buffer bytes in RAM and flush only
 * when a new segment is produced.

 * Once we have used our allowed RAM buffer, or the number
 * of added docs is large enough (in the case we are
 * flushing by doc count instead of RAM usage), we create a
 * real segment and flush it to the Directory.
 *
 * Threads:
 *
 * Multiple threads are allowed into addDocument at once.
 * There is an initial synchronized call to getThreadState
 * which allocates a ThreadState for this thread.  The same
 * thread will get the same ThreadState over time (thread
 * affinity) so that if there are consistent patterns (for
 * example each thread is indexing a different content
 * source) then we make better use of RAM.  Then
 * processDocument is called on that ThreadState without
 * synchronization (most of the "heavy lifting" is in this
 * call).  Finally the synchronized "finishDocument" is
 * called to flush changes to the directory.
 *
 * When flush is called by IndexWriter we forcefully idle
 * all threads and flush only once they are all idle.  This
 * means you can call flush with a given thread even while
 * other threads are actively adding/deleting documents.
 *
 *
 * Exceptions:
 *
 * Because this class directly updates in-memory posting
 * lists, and flushes stored fields and term vectors
 * directly to files in the directory, there are certain
 * limited times when an exception can corrupt this state.
 * For example, a disk full while flushing stored fields
 * leaves this file in a corrupt state.  Or, an OOM
 * exception while appending to the in-memory posting lists
 * can corrupt that posting list.  We call such exceptions
 * "aborting exceptions".  In these cases we must call
 * abort() to discard all docs added since the last flush.
 *
 * All other exceptions ("non-aborting exceptions") can
 * still partially update the index structures.  These
 * updates are consistent, but, they represent only a part
 * of the document seen up until the exception was hit.
 * When this happens, we immediately mark the document as
 * deleted so that the document is always atomically ("all
 * or none") added to the index.
 */

final class DocumentsWriter {
  final AtomicLong bytesUsed = new AtomicLong(0);
  IndexWriter writer;
  Directory directory;

  String segment;                         // Current segment we are working on

  private int nextDocID;                  // Next docID to be added
  private int numDocs;                    // # of docs added, but not yet flushed

  // Max # ThreadState instances; if there are more threads
  // than this they share ThreadStates
  private DocumentsWriterThreadState[] threadStates = new DocumentsWriterThreadState[0];
  private final HashMap<Thread,DocumentsWriterThreadState> threadBindings = new HashMap();

  boolean bufferIsFull;                   // True when it's time to write segment
  private boolean aborting;               // True if an abort is pending

  PrintStream infoStream;
  int maxFieldLength = IndexWriter.DEFAULT_MAX_FIELD_LENGTH;
  Similarity similarity;

  // max # simultaneous threads; if there are more than
  // this, they wait for others to finish first
  private final int maxThreadStates;

  // Deletes for our still-in-RAM (to be flushed next) segment
  private BufferedDeletes pendingDeletes = new BufferedDeletes(false);
  
  static class DocState {
    DocumentsWriter docWriter;
    Analyzer analyzer;
    int maxFieldLength;
    PrintStream infoStream;
    Similarity similarity;
    int docID;
    Document doc;
    String maxTermPrefix;

    // Only called by asserts
    public boolean testPoint(String name) {
      return docWriter.writer.testPoint(name);
    }

    public void clear() {
      // don't hold onto doc nor analyzer, in case it is
      // largish:
      doc = null;
      analyzer = null;
    }
  }

  /** Consumer returns this on each doc.  This holds any
   *  state that must be flushed synchronized "in docID
   *  order".  We gather these and flush them in order. */
  abstract static class DocWriter {
    DocWriter next;
    int docID;
    abstract void finish() throws IOException;
    abstract void abort();
    abstract long sizeInBytes();

    void setNext(DocWriter next) {
      this.next = next;
    }
  }

  /**
   * Create and return a new DocWriterBuffer.
   */
  PerDocBuffer newPerDocBuffer() {
    return new PerDocBuffer();
  }

  /**
   * RAMFile buffer for DocWriters.
   */
  class PerDocBuffer extends RAMFile {
    
    /**
     * Allocate bytes used from shared pool.
     */
    @Override
    protected byte[] newBuffer(int size) {
      assert size == PER_DOC_BLOCK_SIZE;
      return perDocAllocator.getByteBlock();
    }
    
    /**
     * Recycle the bytes used.
     */
    synchronized void recycle() {
      if (buffers.size() > 0) {
        setLength(0);
        
        // Recycle the blocks
        perDocAllocator.recycleByteBlocks(buffers);
        buffers.clear();
        sizeInBytes = 0;
        
        assert numBuffers() == 0;
      }
    }
  }
  
  /**
   * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
   * which returns the DocConsumer that the DocumentsWriter calls to process the
   * documents. 
   */
  abstract static class IndexingChain {
    abstract DocConsumer getChain(DocumentsWriter documentsWriter);
  }
  
  static final IndexingChain defaultIndexingChain = new IndexingChain() {

    @Override
    DocConsumer getChain(DocumentsWriter documentsWriter) {
      /*
      This is the current indexing chain:

      DocConsumer / DocConsumerPerThread
        --> code: DocFieldProcessor / DocFieldProcessorPerThread
          --> DocFieldConsumer / DocFieldConsumerPerThread / DocFieldConsumerPerField
            --> code: DocFieldConsumers / DocFieldConsumersPerThread / DocFieldConsumersPerField
              --> code: DocInverter / DocInverterPerThread / DocInverterPerField
                --> InvertedDocConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
                  --> code: TermsHash / TermsHashPerThread / TermsHashPerField
                    --> TermsHashConsumer / TermsHashConsumerPerThread / TermsHashConsumerPerField
                      --> code: FreqProxTermsWriter / FreqProxTermsWriterPerThread / FreqProxTermsWriterPerField
                      --> code: TermVectorsTermsWriter / TermVectorsTermsWriterPerThread / TermVectorsTermsWriterPerField
                --> InvertedDocEndConsumer / InvertedDocConsumerPerThread / InvertedDocConsumerPerField
                  --> code: NormsWriter / NormsWriterPerThread / NormsWriterPerField
              --> code: StoredFieldsWriter / StoredFieldsWriterPerThread / StoredFieldsWriterPerField
    */

    // Build up indexing chain:

      final TermsHashConsumer termVectorsWriter = new TermVectorsTermsWriter(documentsWriter);
      final TermsHashConsumer freqProxWriter = new FreqProxTermsWriter();

      final InvertedDocConsumer  termsHash = new TermsHash(documentsWriter, true, freqProxWriter,
                                                           new TermsHash(documentsWriter, false, termVectorsWriter, null));
      final NormsWriter normsWriter = new NormsWriter();
      final DocInverter docInverter = new DocInverter(termsHash, normsWriter);
      return new DocFieldProcessor(documentsWriter, docInverter);
    }
  };

  final DocConsumer consumer;

  // How much RAM we can use before flushing.  This is 0 if
  // we are flushing by doc count instead.

  private final IndexWriterConfig config;

  private boolean closed;
  private final FieldInfos fieldInfos;

  private final BufferedDeletesStream bufferedDeletesStream;
  private final IndexWriter.FlushControl flushControl;

  DocumentsWriter(IndexWriterConfig config, Directory directory, IndexWriter writer, FieldInfos fieldInfos, BufferedDeletesStream bufferedDeletesStream) throws IOException {
    this.directory = directory;
    this.writer = writer;
    this.similarity = config.getSimilarity();
    this.maxThreadStates = config.getMaxThreadStates();
    this.fieldInfos = fieldInfos;
    this.bufferedDeletesStream = bufferedDeletesStream;
    flushControl = writer.flushControl;

    consumer = config.getIndexingChain().getChain(this);
    this.config = config;
  }

  // Buffer a specific docID for deletion.  Currently only
  // used when we hit a exception when adding a document
  synchronized void deleteDocID(int docIDUpto) {
    pendingDeletes.addDocID(docIDUpto);
    // NOTE: we do not trigger flush here.  This is
    // potentially a RAM leak, if you have an app that tries
    // to add docs but every single doc always hits a
    // non-aborting exception.  Allowing a flush here gets
    // very messy because we are only invoked when handling
    // exceptions so to do this properly, while handling an
    // exception we'd have to go off and flush new deletes
    // which is risky (likely would hit some other
    // confounding exception).
  }
  
  boolean deleteQueries(Query... queries) {
    final boolean doFlush = flushControl.waitUpdate(0, queries.length);
    synchronized(this) {
      for (Query query : queries) {
        pendingDeletes.addQuery(query, numDocs);
      }
    }
    return doFlush;
  }
  
  boolean deleteQuery(Query query) { 
    final boolean doFlush = flushControl.waitUpdate(0, 1);
    synchronized(this) {
      pendingDeletes.addQuery(query, numDocs);
    }
    return doFlush;
  }
  
  boolean deleteTerms(Term... terms) {
    final boolean doFlush = flushControl.waitUpdate(0, terms.length);
    synchronized(this) {
      for (Term term : terms) {
        pendingDeletes.addTerm(term, numDocs);
      }
    }
    return doFlush;
  }

  // TODO: we could check w/ FreqProxTermsWriter: if the
  // term doesn't exist, don't bother buffering into the
  // per-DWPT map (but still must go into the global map)
  boolean deleteTerm(Term term, boolean skipWait) {
    final boolean doFlush = flushControl.waitUpdate(0, 1, skipWait);
    synchronized(this) {
      pendingDeletes.addTerm(term, numDocs);
    }
    return doFlush;
  }

  public FieldInfos getFieldInfos() {
    return fieldInfos;
  }

  /** If non-null, various details of indexing are printed
   *  here. */
  synchronized void setInfoStream(PrintStream infoStream) {
    this.infoStream = infoStream;
    for(int i=0;i<threadStates.length;i++) {
      threadStates[i].docState.infoStream = infoStream;
    }
  }

  synchronized void setMaxFieldLength(int maxFieldLength) {
    this.maxFieldLength = maxFieldLength;
    for(int i=0;i<threadStates.length;i++) {
      threadStates[i].docState.maxFieldLength = maxFieldLength;
    }
  }

  synchronized void setSimilarity(Similarity similarity) {
    this.similarity = similarity;
    for(int i=0;i<threadStates.length;i++) {
      threadStates[i].docState.similarity = similarity;
    }
  }

  /** Get current segment name we are writing. */
  synchronized String getSegment() {
    return segment;
  }

  /** Returns how many docs are currently buffered in RAM. */
  synchronized int getNumDocs() {
    return numDocs;
  }

  void message(String message) {
    if (infoStream != null) {
      writer.message("DW: " + message);
    }
  }

  synchronized void setAborting() {
    if (infoStream != null) {
      message("setAborting");
    }
    aborting = true;
  }

  /** Called if we hit an exception at a bad time (when
   *  updating the index files) and must discard all
   *  currently buffered docs.  This resets our state,
   *  discarding any docs added since last flush. */
  synchronized void abort() throws IOException {
    if (infoStream != null) {
      message("docWriter: abort");
    }

    boolean success = false;

    try {

      // Forcefully remove waiting ThreadStates from line
      try {
        waitQueue.abort();
      } catch (Throwable t) {
      }

      // Wait for all other threads to finish with
      // DocumentsWriter:
      try {
        waitIdle();
      } finally {
        if (infoStream != null) {
          message("docWriter: abort waitIdle done");
        }
        
        assert 0 == waitQueue.numWaiting: "waitQueue.numWaiting=" + waitQueue.numWaiting;
        waitQueue.waitingBytes = 0;
        
        pendingDeletes.clear();
        
        for (DocumentsWriterThreadState threadState : threadStates) {
          try {
            threadState.consumer.abort();
          } catch (Throwable t) {
          }
        }
          
        try {
          consumer.abort();
        } catch (Throwable t) {
        }
        
        // Reset all postings data
        doAfterFlush();
      }

      success = true;
    } finally {
      aborting = false;
      notifyAll();
      if (infoStream != null) {
        message("docWriter: done abort; success=" + success);
      }
    }
  }

  /** Reset after a flush */
  private void doAfterFlush() throws IOException {
    // All ThreadStates should be idle when we are called
    assert allThreadsIdle();
    threadBindings.clear();
    waitQueue.reset();
    segment = null;
    numDocs = 0;
    nextDocID = 0;
    bufferIsFull = false;
    for(int i=0;i<threadStates.length;i++) {
      threadStates[i].doAfterFlush();
    }
  }

  private synchronized boolean allThreadsIdle() {
    for(int i=0;i<threadStates.length;i++) {
      if (!threadStates[i].isIdle) {
        return false;
      }
    }
    return true;
  }

  synchronized boolean anyChanges() {
    return numDocs != 0 || pendingDeletes.any();
  }

  // for testing
  public BufferedDeletes getPendingDeletes() {
    return pendingDeletes;
  }

  private void pushDeletes(SegmentInfo newSegment, SegmentInfos segmentInfos) {
    // Lock order: DW -> BD
    final long delGen = bufferedDeletesStream.getNextGen();
    if (pendingDeletes.any()) {
      if (segmentInfos.size() > 0 || newSegment != null) {
        final FrozenBufferedDeletes packet = new FrozenBufferedDeletes(pendingDeletes, delGen);
        if (infoStream != null) {
          message("flush: push buffered deletes startSize=" + pendingDeletes.bytesUsed.get() + " frozenSize=" + packet.bytesUsed);
        }
        bufferedDeletesStream.push(packet);
        if (infoStream != null) {
          message("flush: delGen=" + packet.gen);
        }
        if (newSegment != null) {
          newSegment.setBufferedDeletesGen(packet.gen);
        }
      } else {
        if (infoStream != null) {
          message("flush: drop buffered deletes: no segments");
        }
        // We can safely discard these deletes: since
        // there are no segments, the deletions cannot
        // affect anything.
      }
      pendingDeletes.clear();
    } else if (newSegment != null) {
      newSegment.setBufferedDeletesGen(delGen);
    }
  }

  public boolean anyDeletions() {
    return pendingDeletes.any();
  }

  /** Flush all pending docs to a new segment */
  // Lock order: IW -> DW
  synchronized SegmentInfo flush(IndexWriter writer, IndexFileDeleter deleter, MergePolicy mergePolicy, SegmentInfos segmentInfos) throws IOException {

    final long startTime = System.currentTimeMillis();

    // We change writer's segmentInfos:
    assert Thread.holdsLock(writer);

    waitIdle();

    if (numDocs == 0) {
      // nothing to do!
      if (infoStream != null) {
        message("flush: no docs; skipping");
      }
      // Lock order: IW -> DW -> BD
      pushDeletes(null, segmentInfos);
      return null;
    }

    if (aborting) {
      if (infoStream != null) {
        message("flush: skip because aborting is set");
      }
      return null;
    }

    boolean success = false;

    SegmentInfo newSegment;

    try {
      //System.out.println(Thread.currentThread().getName() + ": nw=" + waitQueue.numWaiting);
      assert nextDocID == numDocs: "nextDocID=" + nextDocID + " numDocs=" + numDocs;
      assert waitQueue.numWaiting == 0: "numWaiting=" + waitQueue.numWaiting;
      assert waitQueue.waitingBytes == 0;

      if (infoStream != null) {
        message("flush postings as segment " + segment + " numDocs=" + numDocs);
      }

      final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segment, fieldInfos,
                                                                 numDocs, writer.getConfig().getTermIndexInterval(),
                                                                 pendingDeletes);
      // Apply delete-by-docID now (delete-byDocID only
      // happens when an exception is hit processing that
      // doc, eg if analyzer has some problem w/ the text):
      if (pendingDeletes.docIDs.size() > 0) {
        flushState.deletedDocs = new BitVector(numDocs);
        for(int delDocID : pendingDeletes.docIDs) {
          flushState.deletedDocs.set(delDocID);
        }
        pendingDeletes.bytesUsed.addAndGet(-pendingDeletes.docIDs.size() * BufferedDeletes.BYTES_PER_DEL_DOCID);
        pendingDeletes.docIDs.clear();
      }

      newSegment = new SegmentInfo(segment, numDocs, directory, false, true, fieldInfos.hasProx(), false);

      Collection<DocConsumerPerThread> threads = new HashSet();
      for (DocumentsWriterThreadState threadState : threadStates) {
        threads.add(threadState.consumer);
      }

      double startMBUsed = bytesUsed()/1024./1024.;

      consumer.flush(threads, flushState);

      newSegment.setHasVectors(flushState.hasVectors);

      if (infoStream != null) {
        message("new segment has " + (flushState.hasVectors ? "vectors" : "no vectors"));
        if (flushState.deletedDocs != null) {
          message("new segment has " + flushState.deletedDocs.count() + " deleted docs");
        }
        message("flushedFiles=" + newSegment.files());
      }

      if (mergePolicy.useCompoundFile(segmentInfos, newSegment)) {
        final String cfsFileName = IndexFileNames.segmentFileName(segment, IndexFileNames.COMPOUND_FILE_EXTENSION);

        if (infoStream != null) {
          message("flush: create compound file \"" + cfsFileName + "\"");
        }

        CompoundFileWriter cfsWriter = new CompoundFileWriter(directory, cfsFileName);
        for(String fileName : newSegment.files()) {
          cfsWriter.addFile(fileName);
        }
        cfsWriter.close();
        deleter.deleteNewFiles(newSegment.files());
        newSegment.setUseCompoundFile(true);
      }

      // Must write deleted docs after the CFS so we don't
      // slurp the del file into CFS:
      if (flushState.deletedDocs != null) {
        final int delCount = flushState.deletedDocs.count();
        assert delCount > 0;
        newSegment.setDelCount(delCount);
        newSegment.advanceDelGen();
        final String delFileName = newSegment.getDelFileName();
        if (infoStream != null) {
          message("flush: write " + delCount + " deletes to " + delFileName);
        }
        boolean success2 = false;
        try {
          // TODO: in the NRT case it'd be better to hand
          // this del vector over to the
          // shortly-to-be-opened SegmentReader and let it
          // carry the changes; there's no reason to use
          // filesystem as intermediary here.
          flushState.deletedDocs.write(directory, delFileName);
          success2 = true;
        } finally {
          if (!success2) {
            try {
              directory.deleteFile(delFileName);
            } catch (Throwable t) {
              // suppress this so we keep throwing the
              // original exception
            }
          }
        }
      }

      if (infoStream != null) {
        message("flush: segment=" + newSegment);
        final double newSegmentSizeNoStore = newSegment.sizeInBytes(false)/1024./1024.;
        final double newSegmentSize = newSegment.sizeInBytes(true)/1024./1024.;
        message("  ramUsed=" + nf.format(startMBUsed) + " MB" +
                " newFlushedSize=" + nf.format(newSegmentSize) + " MB" +
                " (" + nf.format(newSegmentSizeNoStore) + " MB w/o doc stores)" +
                " docs/MB=" + nf.format(numDocs / newSegmentSize) +
                " new/old=" + nf.format(100.0 * newSegmentSizeNoStore / startMBUsed) + "%");
      }

      success = true;
    } finally {
      notifyAll();
      if (!success) {
        if (segment != null) {
          deleter.refresh(segment);
        }
        abort();
      }
    }

    doAfterFlush();

    // Lock order: IW -> DW -> BD
    pushDeletes(newSegment, segmentInfos);
    if (infoStream != null) {
      message("flush time " + (System.currentTimeMillis()-startTime) + " msec");
    }

    return newSegment;
  }

  synchronized void close() {
    closed = true;
    notifyAll();
  }

  /** Returns a free (idle) ThreadState that may be used for
   * indexing this one document.  This call also pauses if a
   * flush is pending.  If delTerm is non-null then we
   * buffer this deleted term after the thread state has
   * been acquired. */
  synchronized DocumentsWriterThreadState getThreadState(Term delTerm, int docCount) throws IOException {

    final Thread currentThread = Thread.currentThread();
    assert !Thread.holdsLock(writer);

    // First, find a thread state.  If this thread already
    // has affinity to a specific ThreadState, use that one
    // again.
    DocumentsWriterThreadState state = threadBindings.get(currentThread);
    if (state == null) {

      // First time this thread has called us since last
      // flush.  Find the least loaded thread state:
      DocumentsWriterThreadState minThreadState = null;
      for(int i=0;i<threadStates.length;i++) {
        DocumentsWriterThreadState ts = threadStates[i];
        if (minThreadState == null || ts.numThreads < minThreadState.numThreads) {
          minThreadState = ts;
        }
      }
      if (minThreadState != null && (minThreadState.numThreads == 0 || threadStates.length >= maxThreadStates)) {
        state = minThreadState;
        state.numThreads++;
      } else {
        // Just create a new "private" thread state
        DocumentsWriterThreadState[] newArray = new DocumentsWriterThreadState[1+threadStates.length];
        if (threadStates.length > 0) {
          System.arraycopy(threadStates, 0, newArray, 0, threadStates.length);
        }
        state = newArray[threadStates.length] = new DocumentsWriterThreadState(this);
        threadStates = newArray;
      }
      threadBindings.put(currentThread, state);
    }

    // Next, wait until my thread state is idle (in case
    // it's shared with other threads), and no flush/abort
    // pending 
    waitReady(state);

    // Allocate segment name if this is the first doc since
    // last flush:
    if (segment == null) {
      segment = writer.newSegmentName();
      assert numDocs == 0;
    }

    state.docState.docID = nextDocID;
    nextDocID += docCount;

    if (delTerm != null) {
      pendingDeletes.addTerm(delTerm, state.docState.docID);
    }

    numDocs += docCount;
    state.isIdle = false;
    return state;
  }
  
  boolean addDocument(Document doc, Analyzer analyzer) throws CorruptIndexException, IOException {
    return updateDocument(doc, analyzer, null);
  }
  
  boolean updateDocument(Document doc, Analyzer analyzer, Term delTerm)
    throws CorruptIndexException, IOException {

    // Possibly trigger a flush, or wait until any running flush completes:
    boolean doFlush = flushControl.waitUpdate(1, delTerm != null ? 1 : 0);

    // This call is synchronized but fast
    final DocumentsWriterThreadState state = getThreadState(delTerm, 1);

    final DocState docState = state.docState;
    docState.doc = doc;
    docState.analyzer = analyzer;

    boolean success = false;
    try {
      // This call is not synchronized and does all the
      // work
      final DocWriter perDoc;
      try {
        perDoc = state.consumer.processDocument();
      } finally {
        docState.clear();
      }

      // This call is synchronized but fast
      finishDocument(state, perDoc);

      success = true;
    } finally {
      if (!success) {

        // If this thread state had decided to flush, we
        // must clear it so another thread can flush
        if (doFlush) {
          flushControl.clearFlushPending();
        }

        if (infoStream != null) {
          message("exception in updateDocument aborting=" + aborting);
        }

        synchronized(this) {

          state.isIdle = true;
          notifyAll();
            
          if (aborting) {
            abort();
          } else {
            skipDocWriter.docID = docState.docID;
            boolean success2 = false;
            try {
              waitQueue.add(skipDocWriter);
              success2 = true;
            } finally {
              if (!success2) {
                abort();
                return false;
              }
            }

            // Immediately mark this document as deleted
            // since likely it was partially added.  This
            // keeps indexing as "all or none" (atomic) when
            // adding a document:
            deleteDocID(state.docState.docID);
          }
        }
      }
    }

    doFlush |= flushControl.flushByRAMUsage("new document");

    return doFlush;
  }

  boolean updateDocuments(Collection<Document> docs, Analyzer analyzer, Term delTerm)
    throws CorruptIndexException, IOException {

    // Possibly trigger a flush, or wait until any running flush completes:
    boolean doFlush = flushControl.waitUpdate(docs.size(), delTerm != null ? 1 : 0);

    final int docCount = docs.size();

    // This call is synchronized but fast -- we allocate the
    // N docIDs up front:
    final DocumentsWriterThreadState state = getThreadState(null, docCount);
    final DocState docState = state.docState;

    final int startDocID = docState.docID;
    int docID = startDocID;

    //System.out.println(Thread.currentThread().getName() + ": A " + docCount);
    for(Document doc : docs) {
      docState.doc = doc;
      docState.analyzer = analyzer;
      // Assign next docID from our block:
      docState.docID = docID++;
      
      boolean success = false;
      try {
        // This call is not synchronized and does all the
        // work
        final DocWriter perDoc;
        try {
          perDoc = state.consumer.processDocument();
        } finally {
          docState.clear();
        }

        // Must call this w/o holding synchronized(this) else
        // we'll hit deadlock:
        balanceRAM();

        // Synchronized but fast
        synchronized(this) {
          if (aborting) {
            break;
          }
          assert perDoc == null || perDoc.docID == docState.docID;
          final boolean doPause;
          if (perDoc != null) {
            doPause = waitQueue.add(perDoc);
          } else {
            skipDocWriter.docID = docState.docID;
            doPause = waitQueue.add(skipDocWriter);
          }
          if (doPause) {
            waitForWaitQueue();
          }
        }

        success = true;
      } finally {
        if (!success) {
          //System.out.println(Thread.currentThread().getName() + ": E");

          // If this thread state had decided to flush, we
          // must clear it so another thread can flush
          if (doFlush) {
            message("clearFlushPending!");
            flushControl.clearFlushPending();
          }

          if (infoStream != null) {
            message("exception in updateDocuments aborting=" + aborting);
          }

          synchronized(this) {

            state.isIdle = true;
            notifyAll();

            if (aborting) {
              abort();
            } else {

              // Fill hole in the doc stores for all
              // docIDs we pre-allocated
              //System.out.println(Thread.currentThread().getName() + ": F " + docCount);
              final int endDocID = startDocID + docCount;
              docID = docState.docID;
              while(docID < endDocID) {
                skipDocWriter.docID = docID++;
                boolean success2 = false;
                try {
                  waitQueue.add(skipDocWriter);
                  success2 = true;
                } finally {
                  if (!success2) {
                    abort();
                    return false;
                  }
                }
              }
              //System.out.println(Thread.currentThread().getName() + ":   F " + docCount + " done");

              // Mark all pre-allocated docIDs as deleted:
              docID = startDocID;
              while(docID < startDocID + docs.size()) {
                deleteDocID(docID++);
              }
            }
          }
        }
      }
    }
    //System.out.println(Thread.currentThread().getName() + ":   A " + docCount);

    synchronized(this) {
      if (aborting) {

        // We are currently aborting, and another thread is
        // waiting for me to become idle.  We just forcefully
        // idle this threadState; it will be fully reset by
        // abort()
        state.isIdle = true;

        // wakes up any threads waiting on the wait queue
        notifyAll();

        abort();

        if (doFlush) {
          message("clearFlushPending!");
          flushControl.clearFlushPending();
        }

        return false;
      }

      // Apply delTerm only after all indexing has
      // succeeded, but apply it only to docs prior to when
      // this batch started:
      if (delTerm != null) {
        pendingDeletes.addTerm(delTerm, startDocID);
      }

      state.isIdle = true;

      // wakes up any threads waiting on the wait queue
      notifyAll();
    }

    doFlush |= flushControl.flushByRAMUsage("new document");

    //System.out.println(Thread.currentThread().getName() + ":   B " + docCount);
    return doFlush;
  }

  public synchronized void waitIdle() {
    while (!allThreadsIdle()) {
      try {
        wait();
      } catch (InterruptedException ie) {
        throw new ThreadInterruptedException(ie);
      }
    }
  }

  synchronized void waitReady(DocumentsWriterThreadState state) {
    while (!closed && (!state.isIdle || aborting)) {
      try {
        wait();
      } catch (InterruptedException ie) {
        throw new ThreadInterruptedException(ie);
      }
    }

    if (closed) {
      throw new AlreadyClosedException("this IndexWriter is closed");
    }
  }

  /** Does the synchronized work to finish/flush the
   *  inverted document. */
  private void finishDocument(DocumentsWriterThreadState perThread, DocWriter docWriter) throws IOException {

    // Must call this w/o holding synchronized(this) else
    // we'll hit deadlock:
    balanceRAM();

    synchronized(this) {

      assert docWriter == null || docWriter.docID == perThread.docState.docID;

      if (aborting) {

        // We are currently aborting, and another thread is
        // waiting for me to become idle.  We just forcefully
        // idle this threadState; it will be fully reset by
        // abort()
        if (docWriter != null) {
          try {
            docWriter.abort();
          } catch (Throwable t) {
          }
        }

        perThread.isIdle = true;

        // wakes up any threads waiting on the wait queue
        notifyAll();

        return;
      }

      final boolean doPause;

      if (docWriter != null) {
        doPause = waitQueue.add(docWriter);
      } else {
        skipDocWriter.docID = perThread.docState.docID;
        doPause = waitQueue.add(skipDocWriter);
      }

      if (doPause) {
        waitForWaitQueue();
      }

      perThread.isIdle = true;

      // wakes up any threads waiting on the wait queue
      notifyAll();
    }
  }

  synchronized void waitForWaitQueue() {
    do {
      try {
        wait();
      } catch (InterruptedException ie) {
        throw new ThreadInterruptedException(ie);
      }
    } while (!waitQueue.doResume());
  }

  private static class SkipDocWriter extends DocWriter {
    @Override
    void finish() {
    }
    @Override
    void abort() {
    }
    @Override
    long sizeInBytes() {
      return 0;
    }
  }
  final SkipDocWriter skipDocWriter = new SkipDocWriter();

  NumberFormat nf = NumberFormat.getInstance();

  /* Initial chunks size of the shared byte[] blocks used to
     store postings data */
  final static int BYTE_BLOCK_SHIFT = 15;
  final static int BYTE_BLOCK_SIZE = 1 << BYTE_BLOCK_SHIFT;
  final static int BYTE_BLOCK_MASK = BYTE_BLOCK_SIZE - 1;
  final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;

  private class ByteBlockAllocator extends ByteBlockPool.Allocator {
    final int blockSize;

    ByteBlockAllocator(int blockSize) {
      this.blockSize = blockSize;
    }

    ArrayList<byte[]> freeByteBlocks = new ArrayList();
    
    /* Allocate another byte[] from the shared pool */
    @Override
    byte[] getByteBlock() {
      synchronized(DocumentsWriter.this) {
        final int size = freeByteBlocks.size();
        final byte[] b;
        if (0 == size) {
          b = new byte[blockSize];
          bytesUsed.addAndGet(blockSize);
        } else
          b = freeByteBlocks.remove(size-1);
        return b;
      }
    }

    /* Return byte[]'s to the pool */

    @Override
    void recycleByteBlocks(byte[][] blocks, int start, int end) {
      synchronized(DocumentsWriter.this) {
        for(int i=start;i<end;i++) {
          freeByteBlocks.add(blocks[i]);
          blocks[i] = null;
        }
      }
    }

    @Override
    void recycleByteBlocks(List<byte[]> blocks) {
      synchronized(DocumentsWriter.this) {
        final int size = blocks.size();
        for(int i=0;i<size;i++) {
          freeByteBlocks.add(blocks.get(i));
          blocks.set(i, null);
        }
      }
    }
  }

  /* Initial chunks size of the shared int[] blocks used to
     store postings data */
  final static int INT_BLOCK_SHIFT = 13;
  final static int INT_BLOCK_SIZE = 1 << INT_BLOCK_SHIFT;
  final static int INT_BLOCK_MASK = INT_BLOCK_SIZE - 1;

  private List<int[]> freeIntBlocks = new ArrayList();

  /* Allocate another int[] from the shared pool */
  synchronized int[] getIntBlock() {
    final int size = freeIntBlocks.size();
    final int[] b;
    if (0 == size) {
      b = new int[INT_BLOCK_SIZE];
      bytesUsed.addAndGet(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT);
    } else {
      b = freeIntBlocks.remove(size-1);
    }
    return b;
  }

  synchronized void bytesUsed(long numBytes) {
    bytesUsed.addAndGet(numBytes);
  }

  long bytesUsed() {
    return bytesUsed.get() + pendingDeletes.bytesUsed.get();
  }

  /* Return int[]s to the pool */
  synchronized void recycleIntBlocks(int[][] blocks, int start, int end) {
    for(int i=start;i<end;i++) {
      freeIntBlocks.add(blocks[i]);
      blocks[i] = null;
    }
  }

  ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);

  final static int PER_DOC_BLOCK_SIZE = 1024;

  final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);


  /* Initial chunk size of the shared char[] blocks used to
     store term text */
  final static int CHAR_BLOCK_SHIFT = 14;
  final static int CHAR_BLOCK_SIZE = 1 << CHAR_BLOCK_SHIFT;
  final static int CHAR_BLOCK_MASK = CHAR_BLOCK_SIZE - 1;

  final static int MAX_TERM_LENGTH = CHAR_BLOCK_SIZE-1;

  private ArrayList<char[]> freeCharBlocks = new ArrayList();

  /* Allocate another char[] from the shared pool */
  synchronized char[] getCharBlock() {
    final int size = freeCharBlocks.size();
    final char[] c;
    if (0 == size) {
      bytesUsed.addAndGet(CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
      c = new char[CHAR_BLOCK_SIZE];
    } else
      c = freeCharBlocks.remove(size-1);
    // We always track allocations of char blocks, for now,
    // because nothing that skips allocation tracking
    // (currently only term vectors) uses its own char
    // blocks.
    return c;
  }

  /* Return char[]s to the pool */
  synchronized void recycleCharBlocks(char[][] blocks, int numBlocks) {
    for(int i=0;i<numBlocks;i++) {
      freeCharBlocks.add(blocks[i]);
      blocks[i] = null;
    }
  }

  String toMB(long v) {
    return nf.format(v/1024./1024.);
  }

  /* We have four pools of RAM: Postings, byte blocks
   * (holds freq/prox posting data), char blocks (holds
   * characters in the term) and per-doc buffers (stored fields/term vectors).  
   * Different docs require varying amount of storage from 
   * these four classes.
   * 
   * For example, docs with many unique single-occurrence
   * short terms will use up the Postings RAM and hardly any
   * of the other two.  Whereas docs with very large terms
   * will use alot of char blocks RAM and relatively less of
   * the other two.  This method just frees allocations from
   * the pools once we are over-budget, which balances the
   * pools to match the current docs. */
  void balanceRAM() {

    final boolean doBalance;
    final long deletesRAMUsed;

    deletesRAMUsed = bufferedDeletesStream.bytesUsed();

    final long ramBufferSize;
    final double mb = config.getRAMBufferSizeMB();
    if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
      ramBufferSize = IndexWriterConfig.DISABLE_AUTO_FLUSH;
    } else {
      ramBufferSize = (long) (mb*1024*1024);
    }

    synchronized(this) {
      if (ramBufferSize == IndexWriterConfig.DISABLE_AUTO_FLUSH || bufferIsFull) {
        return;
      }
    
      doBalance = bytesUsed() + deletesRAMUsed >= ramBufferSize;
    }

    if (doBalance) {

      if (infoStream != null) {
        message("  RAM: balance allocations: usedMB=" + toMB(bytesUsed()) +
                " vs trigger=" + toMB(ramBufferSize) +
                " deletesMB=" + toMB(deletesRAMUsed) +
                " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
                " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
                " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_CHAR));
      }

      final long startBytesUsed = bytesUsed() + deletesRAMUsed;

      int iter = 0;

      // We free equally from each pool in 32 KB
      // chunks until we are below our threshold
      // (freeLevel)

      boolean any = true;

      final long freeLevel = (long) (0.95 * ramBufferSize);

      while(bytesUsed()+deletesRAMUsed > freeLevel) {
      
        synchronized(this) {
          if (0 == perDocAllocator.freeByteBlocks.size() 
              && 0 == byteBlockAllocator.freeByteBlocks.size() 
              && 0 == freeCharBlocks.size() 
              && 0 == freeIntBlocks.size() 
              && !any) {
            // Nothing else to free -- must flush now.
            bufferIsFull = bytesUsed()+deletesRAMUsed > ramBufferSize;
            if (infoStream != null) {
              if (bytesUsed()+deletesRAMUsed > ramBufferSize) {
                message("    nothing to free; set bufferIsFull");
              } else {
                message("    nothing to free");
              }
            }
            break;
          }

          if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
            byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
            bytesUsed.addAndGet(-BYTE_BLOCK_SIZE);
          }

          if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
            freeCharBlocks.remove(freeCharBlocks.size()-1);
            bytesUsed.addAndGet(-CHAR_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_CHAR);
          }

          if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
            freeIntBlocks.remove(freeIntBlocks.size()-1);
            bytesUsed.addAndGet(-INT_BLOCK_SIZE * RamUsageEstimator.NUM_BYTES_INT);
          }

          if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
            // Remove upwards of 32 blocks (each block is 1K)
            for (int i = 0; i < 32; ++i) {
              perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
              bytesUsed.addAndGet(-PER_DOC_BLOCK_SIZE);
              if (perDocAllocator.freeByteBlocks.size() == 0) {
                break;
              }
            }
          }
        }

        if ((4 == iter % 5) && any) {
          // Ask consumer to free any recycled state
          any = consumer.freeRAM();
        }

        iter++;
      }

      if (infoStream != null) {
        message("    after free: freedMB=" + nf.format((startBytesUsed-bytesUsed()-deletesRAMUsed)/1024./1024.) + " usedMB=" + nf.format((bytesUsed()+deletesRAMUsed)/1024./1024.));
      }
    }
  }

  final WaitQueue waitQueue = new WaitQueue();

  private class WaitQueue {
    DocWriter[] waiting;
    int nextWriteDocID;
    int nextWriteLoc;
    int numWaiting;
    long waitingBytes;

    public WaitQueue() {
      waiting = new DocWriter[10];
    }

    synchronized void reset() {
      // NOTE: nextWriteLoc doesn't need to be reset
      assert numWaiting == 0;
      assert waitingBytes == 0;
      nextWriteDocID = 0;
    }

    synchronized boolean doResume() {
      final double mb = config.getRAMBufferSizeMB();
      final long waitQueueResumeBytes;
      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
        waitQueueResumeBytes = 2*1024*1024;
      } else {
        waitQueueResumeBytes = (long) (mb*1024*1024*0.05);
      }
      return waitingBytes <= waitQueueResumeBytes;
    }

    synchronized boolean doPause() {
      final double mb = config.getRAMBufferSizeMB();
      final long waitQueuePauseBytes;
      if (mb == IndexWriterConfig.DISABLE_AUTO_FLUSH) {
        waitQueuePauseBytes = 4*1024*1024;
      } else {
        waitQueuePauseBytes = (long) (mb*1024*1024*0.1);
      }
      return waitingBytes > waitQueuePauseBytes;
    }

    synchronized void abort() {
      int count = 0;
      for(int i=0;i<waiting.length;i++) {
        final DocWriter doc = waiting[i];
        if (doc != null) {
          doc.abort();
          waiting[i] = null;
          count++;
        }
      }
      waitingBytes = 0;
      assert count == numWaiting;
      numWaiting = 0;
    }

    private void writeDocument(DocWriter doc) throws IOException {
      assert doc == skipDocWriter || nextWriteDocID == doc.docID;
      boolean success = false;
      try {
        doc.finish();
        nextWriteDocID++;
        nextWriteLoc++;
        assert nextWriteLoc <= waiting.length;
        if (nextWriteLoc == waiting.length) {
          nextWriteLoc = 0;
        }
        success = true;
      } finally {
        if (!success) {
          setAborting();
        }
      }
    }

    synchronized public boolean add(DocWriter doc) throws IOException {

      assert doc.docID >= nextWriteDocID;

      if (doc.docID == nextWriteDocID) {
        writeDocument(doc);
        while(true) {
          doc = waiting[nextWriteLoc];
          if (doc != null) {
            numWaiting--;
            waiting[nextWriteLoc] = null;
            waitingBytes -= doc.sizeInBytes();
            writeDocument(doc);
          } else {
            break;
          }
        }
      } else {

        // I finished before documents that were added
        // before me.  This can easily happen when I am a
        // small doc and the docs before me were large, or,
        // just due to luck in the thread scheduling.  Just
        // add myself to the queue and when that large doc
        // finishes, it will flush me:
        int gap = doc.docID - nextWriteDocID;
        if (gap >= waiting.length) {
          // Grow queue
          DocWriter[] newArray = new DocWriter[ArrayUtil.oversize(gap, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
          assert nextWriteLoc >= 0;
          System.arraycopy(waiting, nextWriteLoc, newArray, 0, waiting.length-nextWriteLoc);
          System.arraycopy(waiting, 0, newArray, waiting.length-nextWriteLoc, nextWriteLoc);
          nextWriteLoc = 0;
          waiting = newArray;
          gap = doc.docID - nextWriteDocID;
        }

        int loc = nextWriteLoc + gap;
        if (loc >= waiting.length) {
          loc -= waiting.length;
        }

        // We should only wrap one time
        assert loc < waiting.length;

        // Nobody should be in my spot!
        assert waiting[loc] == null;
        waiting[loc] = doc;
        numWaiting++;
        waitingBytes += doc.sizeInBytes();
      }
      
      return doPause();
    }
  }
}

Other Lucene examples (source code examples)

Here is a short list of links related to this Lucene DocumentsWriter.java source code file:



my book on functional programming

 

new blog posts

 

Copyright 1998-2019 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.