alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Lucene example source code file (TaskSequence.java)

This example Lucene source code file (TaskSequence.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Lucene tags/keywords

arraylist, exception, exception, interruptedexception, nomoredataexception, override, override, paralleltask, perftask, perftask, runbackgroundtask, string, tasksequence, tasksequence, text, util

The Lucene TaskSequence.java source code

package org.apache.lucene.benchmark.byTask.tasks;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.util.ArrayList;
import java.util.List;
import java.text.NumberFormat;

import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException;
import org.apache.lucene.benchmark.byTask.stats.TaskStats;
import org.apache.lucene.util.ArrayUtil;

/**
 * Sequence of parallel or sequential tasks.
 */
public class TaskSequence extends PerfTask {
  public static int REPEAT_EXHAUST = -2; 
  private ArrayList<PerfTask> tasks;
  private int repetitions = 1;
  private boolean parallel;
  private TaskSequence parent;
  private boolean letChildReport = true;
  private int rate = 0;
  private boolean perMin = false; // rate, if set, is, by default, be sec.
  private String seqName;
  private boolean exhausted = false;
  private boolean resetExhausted = false;
  private PerfTask[] tasksArray;
  private boolean anyExhaustibleTasks;
  private boolean collapsable = false; // to not collapse external sequence named in alg.  
  
  private boolean fixedTime;                      // true if we run for fixed time
  private double runTimeSec;                      // how long to run for
  private final long logByTimeMsec;

  public TaskSequence (PerfRunData runData, String name, TaskSequence parent, boolean parallel) {
    super(runData);
    collapsable = (name == null);
    name = (name!=null ? name : (parallel ? "Par" : "Seq"));
    setName(name);
    setSequenceName();
    this.parent = parent;
    this.parallel = parallel;
    tasks = new ArrayList<PerfTask>();
    logByTimeMsec = runData.getConfig().get("report.time.step.msec", 0);
  }

  @Override
  public void close() throws Exception {
    initTasksArray();
    for(int i=0;i<tasksArray.length;i++) {
      tasksArray[i].close();
    }
    getRunData().getDocMaker().close();
  }

  private void initTasksArray() {
    if (tasksArray == null) {
      final int numTasks = tasks.size();
      tasksArray = new PerfTask[numTasks];
      for(int k=0;k<numTasks;k++) {
        tasksArray[k] = tasks.get(k);
        anyExhaustibleTasks |= tasksArray[k] instanceof ResetInputsTask;
        anyExhaustibleTasks |= tasksArray[k] instanceof TaskSequence;
      }
    }
    if (!parallel && logByTimeMsec != 0 && !letChildReport) {
      countsByTime = new int[1];
    }
  }

  /**
   * @return Returns the parallel.
   */
  public boolean isParallel() {
    return parallel;
  }

  /**
   * @return Returns the repetitions.
   */
  public int getRepetitions() {
    return repetitions;
  }

  private int[] countsByTime;

  public void setRunTime(double sec) throws Exception {
    runTimeSec = sec;
    fixedTime = true;
  }

  /**
   * @param repetitions The repetitions to set.
   * @throws Exception 
   */
  public void setRepetitions(int repetitions) throws Exception {
    fixedTime = false;
    this.repetitions = repetitions;
    if (repetitions==REPEAT_EXHAUST) {
      if (isParallel()) {
        throw new Exception("REPEAT_EXHAUST is not allowed for parallel tasks");
      }
    }
    setSequenceName();
  }

  /**
   * @return Returns the parent.
   */
  public TaskSequence getParent() {
    return parent;
  }

  /*
   * (non-Javadoc)
   * @see org.apache.lucene.benchmark.byTask.tasks.PerfTask#doLogic()
   */
  @Override
  public int doLogic() throws Exception {
    exhausted = resetExhausted = false;
    return ( parallel ? doParallelTasks() : doSerialTasks());
  }

  private static class RunBackgroundTask extends Thread {
    private final PerfTask task;
    private final boolean letChildReport;
    private volatile int count;

    public RunBackgroundTask(PerfTask task, boolean letChildReport) {
      this.task = task;
      this.letChildReport = letChildReport;
    }

    public void stopNow() throws InterruptedException {
      task.stopNow();
    }

    public int getCount() {
      return count;
    }

    @Override
    public void run() {
      try {
        count = task.runAndMaybeStats(letChildReport);
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
  }

  private int doSerialTasks() throws Exception {
    if (rate > 0) {
      return doSerialTasksWithRate();
    }
    
    initTasksArray();
    int count = 0;

    final long runTime = (long) (runTimeSec*1000);
    List<RunBackgroundTask> bgTasks = null;

    final long t0 = System.currentTimeMillis();
    for (int k=0; fixedTime || (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
      if (stopNow) {
        break;
      }
      for(int l=0;l<tasksArray.length;l++) {
        final PerfTask task = tasksArray[l];
        if (task.getRunInBackground()) {
          if (bgTasks == null) {
            bgTasks = new ArrayList<RunBackgroundTask>();
          }
          RunBackgroundTask bgTask = new RunBackgroundTask(task, letChildReport);
          bgTask.setPriority(task.getBackgroundDeltaPriority() + Thread.currentThread().getPriority());
          bgTask.start();
          bgTasks.add(bgTask);
        } else {
          try {
            final int inc = task.runAndMaybeStats(letChildReport);
            count += inc;
            if (countsByTime != null) {
              final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
              if (slot >= countsByTime.length) {
                countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
              }
              countsByTime[slot] += inc;
            }
            if (anyExhaustibleTasks)
              updateExhausted(task);
          } catch (NoMoreDataException e) {
            exhausted = true;
          }
        }
      }
      if (fixedTime && System.currentTimeMillis()-t0 > runTime) {
        repetitions = k+1;
        break;
      }
    }

    if (bgTasks != null) {
      for(RunBackgroundTask bgTask : bgTasks) {
        bgTask.stopNow();
      }
      for(RunBackgroundTask bgTask : bgTasks) {
        bgTask.join();
        count += bgTask.getCount();
      }
    }

    if (countsByTime != null) {
      getRunData().getPoints().getCurrentStats().setCountsByTime(countsByTime, logByTimeMsec);
    }

    stopNow = false;

    return count;
  }

  private int doSerialTasksWithRate() throws Exception {
    initTasksArray();
    long delayStep = (perMin ? 60000 : 1000) /rate;
    long nextStartTime = System.currentTimeMillis();
    int count = 0;
    final long t0 = System.currentTimeMillis();
    for (int k=0; (repetitions==REPEAT_EXHAUST && !exhausted) || k<repetitions; k++) {
      if (stopNow) {
        break;
      }
      for (int l=0;l<tasksArray.length;l++) {
        final PerfTask task = tasksArray[l];
        while(!stopNow) {
          long waitMore = nextStartTime - System.currentTimeMillis();
          if (waitMore > 0) {
            // TODO: better to use condition to notify
            Thread.sleep(1);
          } else {
            break;
          }
        }
        if (stopNow) {
          break;
        }
        nextStartTime += delayStep; // this aims at avarage rate. 
        try {
          final int inc = task.runAndMaybeStats(letChildReport);
          count += inc;
          if (countsByTime != null) {
            final int slot = (int) ((System.currentTimeMillis()-t0)/logByTimeMsec);
            if (slot >= countsByTime.length) {
              countsByTime = ArrayUtil.grow(countsByTime, 1+slot);
            }
            countsByTime[slot] += inc;
          }

          if (anyExhaustibleTasks)
            updateExhausted(task);
        } catch (NoMoreDataException e) {
          exhausted = true;
        }
      }
    }
    stopNow = false;
    return count;
  }

  // update state regarding exhaustion.
  private void updateExhausted(PerfTask task) {
    if (task instanceof ResetInputsTask) {
      exhausted = false;
      resetExhausted = true;
    } else if (task instanceof TaskSequence) {
      TaskSequence t = (TaskSequence) task;
      if (t.resetExhausted) {
        exhausted = false;
        resetExhausted = true;
        t.resetExhausted = false;
      } else {
        exhausted |= t.exhausted;
      }
    }
  }

  private class ParallelTask extends Thread {

    public int count;
    public final PerfTask task;

    public ParallelTask(PerfTask task) {
      this.task = task;
    }

    @Override
    public void run() {
      try {
        int n = task.runAndMaybeStats(letChildReport);
        if (anyExhaustibleTasks) {
          updateExhausted(task);
        }
        count += n;
      } catch (NoMoreDataException e) {
        exhausted = true;
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    }
  }

  @Override
  public void stopNow() {
    super.stopNow();
    // Forwards top request to children
    if (runningParallelTasks != null) {
      for(ParallelTask t : runningParallelTasks) {
        t.task.stopNow();
      }
    }
  }

  ParallelTask[] runningParallelTasks;

  private int doParallelTasks() throws Exception {

    final TaskStats stats = getRunData().getPoints().getCurrentStats();

    initTasksArray();
    ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
    // prepare threads
    int index = 0;
    for (int k=0; k<repetitions; k++) {
      for (int i = 0; i < tasksArray.length; i++) {
        final PerfTask task = (PerfTask) tasksArray[i].clone();
        t[index++] = new ParallelTask(task);
      }
    }
    // run threads
    startThreads(t);

    // wait for all threads to complete
    int count = 0;
    for (int i = 0; i < t.length; i++) {
      t[i].join();
      count += t[i].count;
      if (t[i].task instanceof TaskSequence) {
        TaskSequence sub = (TaskSequence) t[i].task;
        if (sub.countsByTime != null) {
          if (countsByTime == null) {
            countsByTime = new int[sub.countsByTime.length];
          } else if (countsByTime.length < sub.countsByTime.length) {
            countsByTime = ArrayUtil.grow(countsByTime, sub.countsByTime.length);
          }
          for(int j=0;j<sub.countsByTime.length;j++) {
            countsByTime[j] += sub.countsByTime[j];
          }
        }
      }
    }

    if (countsByTime != null) {
      stats.setCountsByTime(countsByTime, logByTimeMsec);
    }

    // return total count
    return count;
  }

  // run threads
  private void startThreads(ParallelTask[] t) throws InterruptedException {
    if (rate > 0) {
      startlThreadsWithRate(t);
      return;
    }
    for (int i = 0; i < t.length; i++) {
      t[i].start();
    }
  }

  // run threads with rate
  private void startlThreadsWithRate(ParallelTask[] t) throws InterruptedException {
    long delayStep = (perMin ? 60000 : 1000) /rate;
    long nextStartTime = System.currentTimeMillis();
    for (int i = 0; i < t.length; i++) {
      long waitMore = nextStartTime - System.currentTimeMillis();
      if (waitMore > 0) {
        Thread.sleep(waitMore);
      }
      nextStartTime += delayStep; // this aims at average rate of starting threads. 
      t[i].start();
    }
  }

  public void addTask(PerfTask task) {
    tasks.add(task);
    task.setDepth(getDepth()+1);
  }
  
  /* (non-Javadoc)
   * @see java.lang.Object#toString()
   */
  @Override
  public String toString() {
    String padd = getPadding();
    StringBuilder sb = new StringBuilder(super.toString());
    sb.append(parallel ? " [" : " {");
    sb.append(NEW_LINE);
    for (final PerfTask task : tasks) {
      sb.append(task.toString());
      sb.append(NEW_LINE);
    }
    sb.append(padd);
    sb.append(!letChildReport ? ">" : (parallel ? "]" : "}"));
    if (fixedTime) {
      sb.append(" " + NumberFormat.getNumberInstance().format(runTimeSec) + "s");
    } else if (repetitions>1) {
      sb.append(" * " + repetitions);
    } else if (repetitions==REPEAT_EXHAUST) {
      sb.append(" * EXHAUST");
    }
    if (rate>0) {
      sb.append(",  rate: " + rate+"/"+(perMin?"min":"sec"));
    }
    if (getRunInBackground()) {
      sb.append(" &");
      int x = getBackgroundDeltaPriority();
      if (x != 0) {
        sb.append(x);
      }
    }
    return sb.toString();
  }

  /**
   * Execute child tasks in a way that they do not report their time separately.
   */
  public void setNoChildReport() {
    letChildReport  = false;
    for (final PerfTask task : tasks) {
      if (task instanceof TaskSequence) {
        ((TaskSequence)task).setNoChildReport();
  }
    }
  }

  /**
   * Returns the rate per minute: how many operations should be performed in a minute.
   * If 0 this has no effect.
   * @return the rate per min: how many operations should be performed in a minute.
   */
  public int getRate() {
    return (perMin ? rate : 60*rate);
  }

  /**
   * @param rate The rate to set.
   */
  public void setRate(int rate, boolean perMin) {
    this.rate = rate;
    this.perMin = perMin;
    setSequenceName();
  }

  private void setSequenceName() {
    seqName = super.getName();
    if (repetitions==REPEAT_EXHAUST) {
      seqName += "_Exhaust";
    } else if (repetitions>1) {
      seqName += "_"+repetitions;
    }
    if (rate>0) {
      seqName += "_" + rate + (perMin?"/min":"/sec"); 
    }
    if (parallel && seqName.toLowerCase().indexOf("par")<0) {
      seqName += "_Par";
    }
  }

  @Override
  public String getName() {
    return seqName; // override to include more info 
  }

  /**
   * @return Returns the tasks.
   */
  public ArrayList<PerfTask> getTasks() {
    return tasks;
  }

  /* (non-Javadoc)
   * @see java.lang.Object#clone()
   */
  @Override
  protected Object clone() throws CloneNotSupportedException {
    TaskSequence res = (TaskSequence) super.clone();
    res.tasks = new ArrayList<PerfTask>();
    for (int i = 0; i < tasks.size(); i++) {
      res.tasks.add((PerfTask)tasks.get(i).clone());
    }
    return res;
  }

  /**
   * Return true if can be collapsed in case it is outermost sequence
   */
  public boolean isCollapsable() {
    return collapsable;
  }
  
}

Other Lucene examples (source code examples)

Here is a short list of links related to this Lucene TaskSequence.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.