alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Lucene example source code file (CachingCollector.java)

This example Lucene source code file (CachingCollector.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Lucene tags/keywords

arraylist, cachedscorer, cachingcollector, cachingcollector, collector, io, ioexception, ioexception, max_array_size, noscorecachingcollector, override, override, scorecachingcollector, segstart, segstart, util

The Lucene CachingCollector.java source code

package org.apache.lucene.search;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.RamUsageEstimator;

/**
 * Caches all docs, and optionally also scores, coming from
 * a search, and is then able to replay them to another
 * collector.  You specify the max RAM this class may use.
 * Once the collection is done, call {@link #isCached}.  If
 * this returns true, you can use {@link #replay} against a
 * new collector.  If it returns false, this means too much
 * RAM was required and you must instead re-run the original
 * search.
 *
 * <p>NOTE: this class consumes 4 (or 8 bytes, if
 * scoring is cached) per collected document.  If the result
 * set is large this can easily be a very substantial amount
 * of RAM!
 * 
 * <p>NOTE: this class caches at least 128 documents
 * before checking RAM limits.
 * 
 * <p>See the Lucene contrib/grouping module for more
 * details including a full code example.</p>
 *
 * @lucene.experimental
 */
public abstract class CachingCollector extends Collector {
  
  // Max out at 512K arrays
  private static final int MAX_ARRAY_SIZE = 512 * 1024;
  private static final int INITIAL_ARRAY_SIZE = 128;
  private final static int[] EMPTY_INT_ARRAY = new int[0];

  private static class SegStart {
    public final IndexReader reader;
    public final int base;
    public final int end;

    public SegStart(IndexReader reader, int base, int end) {
      this.reader = reader;
      this.base = base;
      this.end = end;
    }
  }
  
  private static final class CachedScorer extends Scorer {

    // NOTE: these members are package-private b/c that way accessing them from
    // the outer class does not incur access check by the JVM. The same
    // situation would be if they were defined in the outer class as private
    // members.
    int doc;
    float score;
    
    private CachedScorer() { super((Weight) null); }

    @Override
    public final float score() { return score; }

    @Override
    public final int advance(int target) { throw new UnsupportedOperationException(); }

    @Override
    public final int docID() { return doc; }

    @Override
    public final float freq() { throw new UnsupportedOperationException(); }

    @Override
    public final int nextDoc() { throw new UnsupportedOperationException(); }
  }

  // A CachingCollector which caches scores
  private static final class ScoreCachingCollector extends CachingCollector {

    private final CachedScorer cachedScorer;
    private final List<float[]> cachedScores;

    private Scorer scorer;
    private float[] curScores;

    ScoreCachingCollector(Collector other, double maxRAMMB) {
      super(other, maxRAMMB, true);

      cachedScorer = new CachedScorer();
      cachedScores = new ArrayList<float[]>();
      curScores = new float[128];
      cachedScores.add(curScores);
    }

    ScoreCachingCollector(Collector other, int maxDocsToCache) {
      super(other, maxDocsToCache);

      cachedScorer = new CachedScorer();
      cachedScores = new ArrayList<float[]>();
      curScores = new float[INITIAL_ARRAY_SIZE];
      cachedScores.add(curScores);
    }

    @Override
    public void collect(int doc) throws IOException {

      if (curDocs == null) {
        // Cache was too large
        cachedScorer.score = scorer.score();
        cachedScorer.doc = doc;
        other.collect(doc);
        return;
      }

      // Allocate a bigger array or abort caching
      if (upto == curDocs.length) {
        base += upto;
        
        // Compute next array length - don't allocate too big arrays
        int nextLength = 8*curDocs.length;
        if (nextLength > MAX_ARRAY_SIZE) {
          nextLength = MAX_ARRAY_SIZE;
        }

        if (base + nextLength > maxDocsToCache) {
          // try to allocate a smaller array
          nextLength = maxDocsToCache - base;
          if (nextLength <= 0) {
            // Too many docs to collect -- clear cache
            curDocs = null;
            curScores = null;
            cachedSegs.clear();
            cachedDocs.clear();
            cachedScores.clear();
            cachedScorer.score = scorer.score();
            cachedScorer.doc = doc;
            other.collect(doc);
            return;
          }
        }
        
        curDocs = new int[nextLength];
        cachedDocs.add(curDocs);
        curScores = new float[nextLength];
        cachedScores.add(curScores);
        upto = 0;
      }
      
      curDocs[upto] = doc;
      cachedScorer.score = curScores[upto] = scorer.score();
      upto++;
      cachedScorer.doc = doc;
      other.collect(doc);
    }

    @Override
    public void replay(Collector other) throws IOException {
      replayInit(other);
      
      int curUpto = 0;
      int curBase = 0;
      int chunkUpto = 0;
      curDocs = EMPTY_INT_ARRAY;
      for (SegStart seg : cachedSegs) {
        other.setNextReader(seg.reader, seg.base);
        other.setScorer(cachedScorer);
        while (curBase + curUpto < seg.end) {
          if (curUpto == curDocs.length) {
            curBase += curDocs.length;
            curDocs = cachedDocs.get(chunkUpto);
            curScores = cachedScores.get(chunkUpto);
            chunkUpto++;
            curUpto = 0;
          }
          cachedScorer.score = curScores[curUpto];
          cachedScorer.doc = curDocs[curUpto];
          other.collect(curDocs[curUpto++]);
        }
      }
    }

    @Override
    public void setScorer(Scorer scorer) throws IOException {
      this.scorer = scorer;
      other.setScorer(cachedScorer);
    }

    @Override
    public String toString() {
      if (isCached()) {
        return "CachingCollector (" + (base+upto) + " docs & scores cached)";
      } else {
        return "CachingCollector (cache was cleared)";
      }
    }

  }

  // A CachingCollector which does not cache scores
  private static final class NoScoreCachingCollector extends CachingCollector {
    
    NoScoreCachingCollector(Collector other, double maxRAMMB) {
     super(other, maxRAMMB, false);
    }

    NoScoreCachingCollector(Collector other, int maxDocsToCache) {
     super(other, maxDocsToCache);
    }
    
    @Override
    public void collect(int doc) throws IOException {

      if (curDocs == null) {
        // Cache was too large
        other.collect(doc);
        return;
      }

      // Allocate a bigger array or abort caching
      if (upto == curDocs.length) {
        base += upto;
        
        // Compute next array length - don't allocate too big arrays
        int nextLength = 8*curDocs.length;
        if (nextLength > MAX_ARRAY_SIZE) {
          nextLength = MAX_ARRAY_SIZE;
        }

        if (base + nextLength > maxDocsToCache) {
          // try to allocate a smaller array
          nextLength = maxDocsToCache - base;
          if (nextLength <= 0) {
            // Too many docs to collect -- clear cache
            curDocs = null;
            cachedSegs.clear();
            cachedDocs.clear();
            other.collect(doc);
            return;
          }
        }
        
        curDocs = new int[nextLength];
        cachedDocs.add(curDocs);
        upto = 0;
      }
      
      curDocs[upto] = doc;
      upto++;
      other.collect(doc);
    }

    @Override
    public void replay(Collector other) throws IOException {
      replayInit(other);
      
      int curUpto = 0;
      int curbase = 0;
      int chunkUpto = 0;
      curDocs = EMPTY_INT_ARRAY;
      for (SegStart seg : cachedSegs) {
        other.setNextReader(seg.reader, seg.base);
        while (curbase + curUpto < seg.end) {
          if (curUpto == curDocs.length) {
            curbase += curDocs.length;
            curDocs = cachedDocs.get(chunkUpto);
            chunkUpto++;
            curUpto = 0;
          }
          other.collect(curDocs[curUpto++]);
        }
      }
    }

    @Override
    public void setScorer(Scorer scorer) throws IOException {
      other.setScorer(scorer);
    }

    @Override
    public String toString() {
      if (isCached()) {
        return "CachingCollector (" + (base+upto) + " docs cached)";
      } else {
        return "CachingCollector (cache was cleared)";
      }
    }

  }

  // TODO: would be nice if a collector defined a
  // needsScores() method so we can specialize / do checks
  // up front. This is only relevant for the ScoreCaching 
  // version -- if the wrapped Collector does not need 
  // scores, it can avoid cachedScorer entirely.
  protected final Collector other;

  protected final int maxDocsToCache;
  protected final List<SegStart> cachedSegs = new ArrayList();
  protected final List<int[]> cachedDocs;

  private IndexReader lastReader;
  
  protected int[] curDocs;
  protected int upto;
  protected int base;
  protected int lastDocBase;

  /**
   * Creates a {@link CachingCollector} which does not wrap another collector.
   * The cached documents and scores can later be {@link #replay(Collector)
   * replayed}.
   * 
   * @param acceptDocsOutOfOrder
   *          whether documents are allowed to be collected out-of-order
   */
  public static CachingCollector create(final boolean acceptDocsOutOfOrder, boolean cacheScores, double maxRAMMB) {
    Collector other = new Collector() {
      @Override
      public boolean acceptsDocsOutOfOrder() {
        return acceptDocsOutOfOrder;
      }
      
      @Override
      public void setScorer(Scorer scorer) throws IOException {}

      @Override
      public void collect(int doc) throws IOException {}

      @Override
      public void setNextReader(IndexReader reader, int docBase) throws IOException {}

    };
    return create(other, cacheScores, maxRAMMB);
  }

  /**
   * Create a new {@link CachingCollector} that wraps the given collector and
   * caches documents and scores up to the specified RAM threshold.
   * 
   * @param other
   *          the Collector to wrap and delegate calls to.
   * @param cacheScores
   *          whether to cache scores in addition to document IDs. Note that
   *          this increases the RAM consumed per doc
   * @param maxRAMMB
   *          the maximum RAM in MB to consume for caching the documents and
   *          scores. If the collector exceeds the threshold, no documents and
   *          scores are cached.
   */
  public static CachingCollector create(Collector other, boolean cacheScores, double maxRAMMB) {
    return cacheScores ? new ScoreCachingCollector(other, maxRAMMB) : new NoScoreCachingCollector(other, maxRAMMB);
  }

  /**
   * Create a new {@link CachingCollector} that wraps the given collector and
   * caches documents and scores up to the specified max docs threshold.
   *
   * @param other
   *          the Collector to wrap and delegate calls to.
   * @param cacheScores
   *          whether to cache scores in addition to document IDs. Note that
   *          this increases the RAM consumed per doc
   * @param maxDocsToCache
   *          the maximum number of documents for caching the documents and
   *          possible the scores. If the collector exceeds the threshold,
   *          no documents and scores are cached.
   */
  public static CachingCollector create(Collector other, boolean cacheScores, int maxDocsToCache) {
    return cacheScores ? new ScoreCachingCollector(other, maxDocsToCache) : new NoScoreCachingCollector(other, maxDocsToCache);
  }
  
  // Prevent extension from non-internal classes
  private CachingCollector(Collector other, double maxRAMMB, boolean cacheScores) {
    this.other = other;

    cachedDocs = new ArrayList<int[]>();
    curDocs = new int[INITIAL_ARRAY_SIZE];
    cachedDocs.add(curDocs);

    int bytesPerDoc = RamUsageEstimator.NUM_BYTES_INT;
    if (cacheScores) {
      bytesPerDoc += RamUsageEstimator.NUM_BYTES_FLOAT;
    }
    maxDocsToCache = (int) ((maxRAMMB * 1024 * 1024) / bytesPerDoc);
  }

  private CachingCollector(Collector other, int maxDocsToCache) {
    this.other = other;

    cachedDocs = new ArrayList<int[]>();
    curDocs = new int[INITIAL_ARRAY_SIZE];
    cachedDocs.add(curDocs);
    this.maxDocsToCache = maxDocsToCache;
  }

  @Override
  public boolean acceptsDocsOutOfOrder() {
    return other.acceptsDocsOutOfOrder();
  }

  public boolean isCached() {
    return curDocs != null;
  }

  @Override  
  public void setNextReader(IndexReader reader, int docBase) throws IOException {
    other.setNextReader(reader, docBase);
    if (lastReader != null) {
      cachedSegs.add(new SegStart(lastReader, lastDocBase, base + upto));
    }
    lastDocBase = docBase;
    lastReader = reader;
  }

  /** Reused by the specialized inner classes. */
  void replayInit(Collector other) {
    if (!isCached()) {
      throw new IllegalStateException("cannot replay: cache was cleared because too much RAM was required");
    }
    
    if (!other.acceptsDocsOutOfOrder() && this.other.acceptsDocsOutOfOrder()) {
      throw new IllegalArgumentException(
          "cannot replay: given collector does not support "
              + "out-of-order collection, while the wrapped collector does. "
              + "Therefore cached documents may be out-of-order.");
    }
    
    //System.out.println("CC: replay totHits=" + (upto + base));
    if (lastReader != null) {
      cachedSegs.add(new SegStart(lastReader, lastDocBase, base+upto));
      lastReader = null;
    }
  }

  /**
   * Replays the cached doc IDs (and scores) to the given Collector. If this
   * instance does not cache scores, then Scorer is not set on
   * {@code other.setScorer} as well as scores are not replayed.
   * 
   * @throws IllegalStateException
   *           if this collector is not cached (i.e., if the RAM limits were too
   *           low for the number of documents + scores to cache).
   * @throws IllegalArgumentException
   *           if the given Collect's does not support out-of-order collection,
   *           while the collector passed to the ctor does.
   */
  public abstract void replay(Collector other) throws IOException;
  
}

Other Lucene examples (source code examples)

Here is a short list of links related to this Lucene CachingCollector.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.