alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Lucene example source code file (ConcurrentMergeScheduler.java)

This example Lucene source code file (ConcurrentMergeScheduler.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Lucene tags/keywords

arraylist, comparator, concurrentmergescheduler, illegalargumentexception, illegalargumentexception, interruptedexception, io, ioexception, ioexception, list, mergethread, mergethread, override, override, threadinterruptedexception, util

The Lucene ConcurrentMergeScheduler.java source code

package org.apache.lucene.index;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.lucene.store.Directory;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.CollectionUtil;

import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Comparator;

/** A {@link MergeScheduler} that runs each merge using a
 *  separate thread.
 *
 *  <p>Specify the max number of threads that may run at
 *  once with {@link #setMaxThreadCount}.</p>
 *
 *  <p>Separately specify the maximum number of simultaneous
 *  merges with {@link #setMaxMergeCount}.  If the number of
 *  merges exceeds the max number of threads then the
 *  largest merges are paused until one of the smaller
 *  merges completes.</p>
 *
 *  <p>If more than {@link #getMaxMergeCount} merges are
 *  requested then this class will forcefully throttle the
 *  incoming threads by pausing until one more more merges
 *  complete.</p>
 */ 
public class ConcurrentMergeScheduler extends MergeScheduler {

  private int mergeThreadPriority = -1;

  protected List<MergeThread> mergeThreads = new ArrayList();

  // Max number of merge threads allowed to be running at
  // once.  When there are more merges then this, we
  // forcefully pause the larger ones, letting the smaller
  // ones run, up until maxMergeCount merges at which point
  // we forcefully pause incoming threads (that presumably
  // are the ones causing so much merging).  We dynamically
  // default this from 1 to 3, depending on how many cores
  // you have:
  private int maxThreadCount = Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors()/2));

  // Max number of merges we accept before forcefully
  // throttling the incoming threads
  private int maxMergeCount = maxThreadCount+2;

  protected Directory dir;

  private volatile boolean closed;
  protected IndexWriter writer;
  protected int mergeThreadCount;

  public ConcurrentMergeScheduler() {
    if (allInstances != null) {
      // Only for testing
      addMyself();
    }
  }

  /** Sets the max # simultaneous merge threads that should
   *  be running at once.  This must be <= {@link
   *  #setMaxMergeCount}. */
  public void setMaxThreadCount(int count) {
    if (count < 1) {
      throw new IllegalArgumentException("count should be at least 1");
    }
    if (count > maxMergeCount) {
      throw new IllegalArgumentException("count should be <= maxMergeCount (= " + maxMergeCount + ")");
    }
    maxThreadCount = count;
  }

  /** @see #setMaxThreadCount(int) */
  public int getMaxThreadCount() {
    return maxThreadCount;
  }

  /** Sets the max # simultaneous merges that are allowed.
   *  If a merge is necessary yet we already have this many
   *  threads running, the incoming thread (that is calling
   *  add/updateDocument) will block until a merge thread
   *  has completed.  Note that we will only run the
   *  smallest {@link #setMaxThreadCount} merges at a time. */
  public void setMaxMergeCount(int count) {
    if (count < 1) {
      throw new IllegalArgumentException("count should be at least 1");
    }
    if (count < maxThreadCount) {
      throw new IllegalArgumentException("count should be >= maxThreadCount (= " + maxThreadCount + ")");
    }
    maxMergeCount = count;
  }

  /** See {@link #setMaxMergeCount}. */
  public int getMaxMergeCount() {
    return maxMergeCount;
  }

  /** Return the priority that merge threads run at.  By
   *  default the priority is 1 plus the priority of (ie,
   *  slightly higher priority than) the first thread that
   *  calls merge. */
  public synchronized int getMergeThreadPriority() {
    initMergeThreadPriority();
    return mergeThreadPriority;
  }

  /** Set the base priority that merge threads run at.
   *  Note that CMS may increase priority of some merge
   *  threads beyond this base priority.  It's best not to
   *  set this any higher than
   *  Thread.MAX_PRIORITY-maxThreadCount, so that CMS has
   *  room to set relative priority among threads.  */
  public synchronized void setMergeThreadPriority(int pri) {
    if (pri > Thread.MAX_PRIORITY || pri < Thread.MIN_PRIORITY)
      throw new IllegalArgumentException("priority must be in range " + Thread.MIN_PRIORITY + " .. " + Thread.MAX_PRIORITY + " inclusive");
    mergeThreadPriority = pri;
    updateMergeThreads();
  }

  // Larger merges come first
  protected static final Comparator<MergeThread> compareByMergeDocCount = new Comparator() {
    public int compare(MergeThread t1, MergeThread t2) {
      final MergePolicy.OneMerge m1 = t1.getCurrentMerge();
      final MergePolicy.OneMerge m2 = t2.getCurrentMerge();
      
      final int c1 = m1 == null ? Integer.MAX_VALUE : m1.totalDocCount;
      final int c2 = m2 == null ? Integer.MAX_VALUE : m2.totalDocCount;

      return c2 - c1;
    }
  };

  /**
   * Called whenever the running merges have changed, to pause & unpause
   * threads. This method sorts the merge threads by their merge size in
   * descending order and then pauses/unpauses threads from first to last --
   * that way, smaller merges are guaranteed to run before larger ones.
   */
  protected synchronized void updateMergeThreads() {

    // Only look at threads that are alive & not in the
    // process of stopping (ie have an active merge):
    final List<MergeThread> activeMerges = new ArrayList();

    int threadIdx = 0;
    while (threadIdx < mergeThreads.size()) {
      final MergeThread mergeThread = mergeThreads.get(threadIdx);
      if (!mergeThread.isAlive()) {
        // Prune any dead threads
        mergeThreads.remove(threadIdx);
        continue;
      }
      if (mergeThread.getCurrentMerge() != null) {
        activeMerges.add(mergeThread);
      }
      threadIdx++;
    }

    // Sort the merge threads in descending order.
    CollectionUtil.mergeSort(activeMerges, compareByMergeDocCount);
    
    int pri = mergeThreadPriority;
    final int activeMergeCount = activeMerges.size();
    for (threadIdx=0;threadIdx<activeMergeCount;threadIdx++) {
      final MergeThread mergeThread = activeMerges.get(threadIdx);
      final MergePolicy.OneMerge merge = mergeThread.getCurrentMerge();
      if (merge == null) { 
        continue;
      }

      // pause the thread if maxThreadCount is smaller than the number of merge threads.
      final boolean doPause = threadIdx < activeMergeCount - maxThreadCount;

      if (verbose()) {
        if (doPause != merge.getPause()) {
          if (doPause) {
            message("pause thread " + mergeThread.getName());
          } else {
            message("unpause thread " + mergeThread.getName());
          }
        }
      }
      if (doPause != merge.getPause()) {
        merge.setPause(doPause);
      }

      if (!doPause) {
        if (verbose()) {
          message("set priority of merge thread " + mergeThread.getName() + " to " + pri);
        }
        mergeThread.setThreadPriority(pri);
        pri = Math.min(Thread.MAX_PRIORITY, 1+pri);
      }
    }
  }

  /**
   * Returns true if verbosing is enabled. This method is usually used in
   * conjunction with {@link #message(String)}, like that:
   * 
   * <pre>
   * if (verbose()) {
   *   message("your message");
   * }
   * </pre>
   */
  protected boolean verbose() {
    return writer != null && writer.verbose();
  }
  
  /**
   * Outputs the given message - this method assumes {@link #verbose()} was
   * called and returned true.
   */
  protected void message(String message) {
    writer.message("CMS: " + message);
  }

  private synchronized void initMergeThreadPriority() {
    if (mergeThreadPriority == -1) {
      // Default to slightly higher priority than our
      // calling thread
      mergeThreadPriority = 1+Thread.currentThread().getPriority();
      if (mergeThreadPriority > Thread.MAX_PRIORITY)
        mergeThreadPriority = Thread.MAX_PRIORITY;
    }
  }

  @Override
  public void close() {
    closed = true;
    sync();
  }

  /** Wait for any running merge threads to finish */
  public void sync() {
    while (true) {
      MergeThread toSync = null;
      synchronized (this) {
        for (MergeThread t : mergeThreads) {
          if (t.isAlive()) {
            toSync = t;
            break;
          }
        }
      }
      if (toSync != null) {
        try {
          toSync.join();
        } catch (InterruptedException ie) {
          throw new ThreadInterruptedException(ie);
        }
      } else {
        break;
      }
    }
  }

  /**
   * Returns the number of merge threads that are alive. Note that this number
   * is ? {@link #mergeThreads} size.
   */
  protected synchronized int mergeThreadCount() {
    int count = 0;
    for (MergeThread mt : mergeThreads) {
      if (mt.isAlive() && mt.getCurrentMerge() != null) {
        count++;
      }
    }
    return count;
  }

  @Override
  public void merge(IndexWriter writer) throws IOException {

    assert !Thread.holdsLock(writer);

    this.writer = writer;

    initMergeThreadPriority();

    dir = writer.getDirectory();

    // First, quickly run through the newly proposed merges
    // and add any orthogonal merges (ie a merge not
    // involving segments already pending to be merged) to
    // the queue.  If we are way behind on merging, many of
    // these newly proposed merges will likely already be
    // registered.

    if (verbose()) {
      message("now merge");
      message("  index: " + writer.segString());
    }
    
    // Iterate, pulling from the IndexWriter's queue of
    // pending merges, until it's empty:
    while (true) {

      synchronized(this) {
        long startStallTime = 0;
        while (mergeThreadCount() >= 1+maxMergeCount) {
          startStallTime = System.currentTimeMillis();
          if (verbose()) {
            message("    too many merges; stalling...");
          }
          try {
            wait();
          } catch (InterruptedException ie) {
            throw new ThreadInterruptedException(ie);
          }
        }

        if (verbose()) {
          if (startStallTime != 0) {
            message("  stalled for " + (System.currentTimeMillis()-startStallTime) + " msec");
          }
        }
      }


      // TODO: we could be careful about which merges to do in
      // the BG (eg maybe the "biggest" ones) vs FG, which
      // merges to do first (the easiest ones?), etc.
      MergePolicy.OneMerge merge = writer.getNextMerge();
      if (merge == null) {
        if (verbose())
          message("  no more merges pending; now return");
        return;
      }

      // We do this w/ the primary thread to keep
      // deterministic assignment of segment names
      writer.mergeInit(merge);

      boolean success = false;
      try {
        synchronized(this) {
          message("  consider merge " + merge.segString(dir));

          // OK to spawn a new merge thread to handle this
          // merge:
          final MergeThread merger = getMergeThread(writer, merge);
          mergeThreads.add(merger);
          if (verbose()) {
            message("    launch new thread [" + merger.getName() + "]");
          }

          merger.start();

          // Must call this after starting the thread else
          // the new thread is removed from mergeThreads
          // (since it's not alive yet):
          updateMergeThreads();

          success = true;
        }
      } finally {
        if (!success) {
          writer.mergeFinish(merge);
        }
      }
    }
  }

  /** Does the actual merge, by calling {@link IndexWriter#merge} */
  protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
    writer.merge(merge);
  }

  /** Create and return a new MergeThread */
  protected synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
    final MergeThread thread = new MergeThread(writer, merge);
    thread.setThreadPriority(mergeThreadPriority);
    thread.setDaemon(true);
    thread.setName("Lucene Merge Thread #" + mergeThreadCount++);
    return thread;
  }

  protected class MergeThread extends Thread {

    IndexWriter tWriter;
    MergePolicy.OneMerge startMerge;
    MergePolicy.OneMerge runningMerge;
    private volatile boolean done;

    public MergeThread(IndexWriter writer, MergePolicy.OneMerge startMerge) throws IOException {
      this.tWriter = writer;
      this.startMerge = startMerge;
    }

    public synchronized void setRunningMerge(MergePolicy.OneMerge merge) {
      runningMerge = merge;
    }

    public synchronized MergePolicy.OneMerge getRunningMerge() {
      return runningMerge;
    }

    public synchronized MergePolicy.OneMerge getCurrentMerge() {
      if (done) {
        return null;
      } else if (runningMerge != null) {
        return runningMerge;
      } else {
        return startMerge;
      }
    }

    public void setThreadPriority(int pri) {
      try {
        setPriority(pri);
      } catch (NullPointerException npe) {
        // Strangely, Sun's JDK 1.5 on Linux sometimes
        // throws NPE out of here...
      } catch (SecurityException se) {
        // Ignore this because we will still run fine with
        // normal thread priority
      }
    }

    @Override
    public void run() {
      
      // First time through the while loop we do the merge
      // that we were started with:
      MergePolicy.OneMerge merge = this.startMerge;
      
      try {

        if (verbose())
          message("  merge thread: start");

        while(true) {
          setRunningMerge(merge);
          doMerge(merge);

          // Subsequent times through the loop we do any new
          // merge that writer says is necessary:
          merge = tWriter.getNextMerge();
          if (merge != null) {
            tWriter.mergeInit(merge);
            updateMergeThreads();
            if (verbose())
              message("  merge thread: do another merge " + merge.segString(dir));
          } else {
            break;
          }
        }

        if (verbose())
          message("  merge thread: done");

      } catch (Throwable exc) {

        // Ignore the exception if it was due to abort:
        if (!(exc instanceof MergePolicy.MergeAbortedException)) {
          if (!suppressExceptions) {
            // suppressExceptions is normally only set during
            // testing.
            anyExceptions = true;
            handleMergeException(exc);
          }
        }
      } finally {
        done = true;
        synchronized(ConcurrentMergeScheduler.this) {
          updateMergeThreads();
          ConcurrentMergeScheduler.this.notifyAll();
        }
      }
    }

    @Override
    public String toString() {
      MergePolicy.OneMerge merge = getRunningMerge();
      if (merge == null)
        merge = startMerge;
      return "merge thread: " + merge.segString(dir);
    }
  }

  /** Called when an exception is hit in a background merge
   *  thread */
  protected void handleMergeException(Throwable exc) {
    try {
      // When an exception is hit during merge, IndexWriter
      // removes any partial files and then allows another
      // merge to run.  If whatever caused the error is not
      // transient then the exception will keep happening,
      // so, we sleep here to avoid saturating CPU in such
      // cases:
      Thread.sleep(250);
    } catch (InterruptedException ie) {
      throw new ThreadInterruptedException(ie);
    }
    throw new MergePolicy.MergeException(exc, dir);
  }

  static boolean anyExceptions = false;

  /** Used for testing */
  public static boolean anyUnhandledExceptions() {
    if (allInstances == null) {
      throw new RuntimeException("setTestMode() was not called; often this is because your test case's setUp method fails to call super.setUp in LuceneTestCase");
    }
    synchronized(allInstances) {
      final int count = allInstances.size();
      // Make sure all outstanding threads are done so we see
      // any exceptions they may produce:
      for(int i=0;i<count;i++)
        allInstances.get(i).sync();
      boolean v = anyExceptions;
      anyExceptions = false;
      return v;
    }
  }

  public static void clearUnhandledExceptions() {
    synchronized(allInstances) {
      anyExceptions = false;
    }
  }

  /** Used for testing */
  private void addMyself() {
    synchronized(allInstances) {
      final int size = allInstances.size();
      int upto = 0;
      for(int i=0;i<size;i++) {
        final ConcurrentMergeScheduler other = allInstances.get(i);
        if (!(other.closed && 0 == other.mergeThreadCount()))
          // Keep this one for now: it still has threads or
          // may spawn new threads
          allInstances.set(upto++, other);
      }
      allInstances.subList(upto, allInstances.size()).clear();
      allInstances.add(this);
    }
  }

  private boolean suppressExceptions;

  /** Used for testing */
  void setSuppressExceptions() {
    suppressExceptions = true;
  }

  /** Used for testing */
  void clearSuppressExceptions() {
    suppressExceptions = false;
  }

  /** Used for testing */
  private static List<ConcurrentMergeScheduler> allInstances;
  
  /** @deprecated this test mode code will be removed in a future release */
  @Deprecated
  public static void setTestMode() {
    allInstances = new ArrayList<ConcurrentMergeScheduler>();
  }
}

Other Lucene examples (source code examples)

Here is a short list of links related to this Lucene ConcurrentMergeScheduler.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.