alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (AbstractScheduledServiceTest.java)

This example Java source code file (AbstractScheduledServiceTest.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

abstractscheduledservice, atomicinteger, cyclicbarrier, exception, illegalstateexception, override, runnable, scheduledexecutorservice, scheduledfuture, scheduler, seconds, testabstractscheduledcustomservice, testservice, threading, threads, timeunit

The AbstractScheduledServiceTest.java Java example source code

/*
 * Copyright (C) 2011 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.util.concurrent.AbstractScheduledService.Scheduler.newFixedDelaySchedule;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.common.util.concurrent.Service.State;
import com.google.common.util.concurrent.testing.TestingExecutors;

import junit.framework.TestCase;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Unit test for {@link AbstractScheduledService}.
 *
 * @author Luke Sandberg
 */

public class AbstractScheduledServiceTest extends TestCase {

  volatile Scheduler configuration = newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
  volatile ScheduledFuture<?> future = null;

  volatile boolean atFixedRateCalled = false;
  volatile boolean withFixedDelayCalled = false;
  volatile boolean scheduleCalled = false;

  final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
        long delay, TimeUnit unit) {
      return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
  };

  public void testServiceStartStop() throws Exception {
    NullService service = new NullService();
    service.startAsync().awaitRunning();
    assertFalse(future.isDone());
    service.stopAsync().awaitTerminated();
    assertTrue(future.isCancelled());
  }

  private class NullService extends AbstractScheduledService {
    @Override protected void runOneIteration() throws Exception {}
    @Override protected Scheduler scheduler() { return configuration; }
    @Override protected ScheduledExecutorService executor() { return executor; }
  }

  public void testFailOnExceptionFromRun() throws Exception {
    TestService service = new TestService();
    service.runException = new Exception();
    service.startAsync().awaitRunning();
    service.runFirstBarrier.await();
    service.runSecondBarrier.await();
    try {
      future.get();
      fail();
    } catch (CancellationException expected) {
    }
    // An execution exception holds a runtime exception (from throwables.propogate) that holds our
    // original exception.
    assertEquals(service.runException, service.failureCause());
    assertEquals(service.state(), Service.State.FAILED);
  }

  public void testFailOnExceptionFromStartUp() {
    TestService service = new TestService();
    service.startUpException = new Exception();
    try {
      service.startAsync().awaitRunning();
      fail();
    } catch (IllegalStateException e) {
      assertEquals(service.startUpException, e.getCause());
    }
    assertEquals(0, service.numberOfTimesRunCalled.get());
    assertEquals(Service.State.FAILED, service.state());
  }

  public void testFailOnErrorFromStartUpListener() throws InterruptedException {
    final Error error = new Error();
    final CountDownLatch latch = new CountDownLatch(1);
    TestService service = new TestService();
    service.addListener(new Service.Listener() {
      @Override public void running() {
        throw error;
      }
      @Override public void failed(State from, Throwable failure) {
        assertEquals(State.RUNNING, from);
        assertEquals(error, failure);
        latch.countDown();
      }
    }, directExecutor());
    service.startAsync();
    latch.await();

    assertEquals(0, service.numberOfTimesRunCalled.get());
    assertEquals(Service.State.FAILED, service.state());
  }

  public void testFailOnExceptionFromShutDown() throws Exception {
    TestService service = new TestService();
    service.shutDownException = new Exception();
    service.startAsync().awaitRunning();
    service.runFirstBarrier.await();
    service.stopAsync();
    service.runSecondBarrier.await();
    try {
      service.awaitTerminated();
      fail();
    } catch (IllegalStateException e) {
      assertEquals(service.shutDownException, e.getCause());
    }
    assertEquals(Service.State.FAILED, service.state());
  }

  public void testRunOneIterationCalledMultipleTimes() throws Exception {
    TestService service = new TestService();
    service.startAsync().awaitRunning();
    for (int i = 1; i < 10; i++) {
      service.runFirstBarrier.await();
      assertEquals(i, service.numberOfTimesRunCalled.get());
      service.runSecondBarrier.await();
    }
    service.runFirstBarrier.await();
    service.stopAsync();
    service.runSecondBarrier.await();
    service.stopAsync().awaitTerminated();
  }

  public void testExecutorOnlyCalledOnce() throws Exception {
    TestService service = new TestService();
    service.startAsync().awaitRunning();
    // It should be called once during startup.
    assertEquals(1, service.numberOfTimesExecutorCalled.get());
    for (int i = 1; i < 10; i++) {
      service.runFirstBarrier.await();
      assertEquals(i, service.numberOfTimesRunCalled.get());
      service.runSecondBarrier.await();
    }
    service.runFirstBarrier.await();
    service.stopAsync();
    service.runSecondBarrier.await();
    service.stopAsync().awaitTerminated();
    // Only called once overall.
    assertEquals(1, service.numberOfTimesExecutorCalled.get());
  }

  public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
    final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
    AbstractScheduledService service = new AbstractScheduledService() {
      @Override protected void runOneIteration() throws Exception {}

      @Override protected ScheduledExecutorService executor() {
        executor.set(super.executor());
        return executor.get();
      }

      @Override protected Scheduler scheduler() {
        return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
      }
    };

    service.startAsync();
    assertFalse(service.executor().isShutdown());
    service.awaitRunning();
    service.stopAsync();
    service.awaitTerminated();
    assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
  }

  public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
    final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
    AbstractScheduledService service = new AbstractScheduledService() {
      @Override protected void startUp() throws Exception {
        throw new Exception("Failed");
      }

      @Override protected void runOneIteration() throws Exception {}

      @Override protected ScheduledExecutorService executor() {
        executor.set(super.executor());
        return executor.get();
      }

      @Override protected Scheduler scheduler() {
        return newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
      }
    };

    try {
      service.startAsync().awaitRunning();
      fail("Expected service to fail during startup");
    } catch (IllegalStateException expected) {}

    assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
  }

  public void testSchedulerOnlyCalledOnce() throws Exception {
    TestService service = new TestService();
    service.startAsync().awaitRunning();
    // It should be called once during startup.
    assertEquals(1, service.numberOfTimesSchedulerCalled.get());
    for (int i = 1; i < 10; i++) {
      service.runFirstBarrier.await();
      assertEquals(i, service.numberOfTimesRunCalled.get());
      service.runSecondBarrier.await();
    }
    service.runFirstBarrier.await();
    service.stopAsync();
    service.runSecondBarrier.await();
    service.awaitTerminated();
    // Only called once overall.
    assertEquals(1, service.numberOfTimesSchedulerCalled.get());
  }

  public void testTimeout() {
    // Create a service whose executor will never run its commands
    Service service = new AbstractScheduledService() {
      @Override protected Scheduler scheduler() {
        return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.NANOSECONDS);
      }

      @Override protected ScheduledExecutorService executor() {
        return TestingExecutors.noOpScheduledExecutor();
      }

      @Override protected void runOneIteration() throws Exception {}

      @Override protected String serviceName() {
        return "Foo";
      }
    };
    try {
      service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
      fail("Expected timeout");
    } catch (TimeoutException e) {
      assertThat(e).hasMessage("Timed out waiting for Foo [STARTING] to reach the RUNNING state.");
    }
  }

  private class TestService extends AbstractScheduledService {
    CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
    CyclicBarrier runSecondBarrier = new CyclicBarrier(2);

    volatile boolean startUpCalled = false;
    volatile boolean shutDownCalled = false;
    AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
    AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
    AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
    volatile Exception runException = null;
    volatile Exception startUpException = null;
    volatile Exception shutDownException = null;

    @Override
    protected void runOneIteration() throws Exception {
      assertTrue(startUpCalled);
      assertFalse(shutDownCalled);
      numberOfTimesRunCalled.incrementAndGet();
      assertEquals(State.RUNNING, state());
      runFirstBarrier.await();
      runSecondBarrier.await();
      if (runException != null) {
        throw runException;
      }
    }

    @Override
    protected void startUp() throws Exception {
      assertFalse(startUpCalled);
      assertFalse(shutDownCalled);
      startUpCalled = true;
      assertEquals(State.STARTING, state());
      if (startUpException != null) {
        throw startUpException;
      }
    }

    @Override
    protected void shutDown() throws Exception {
      assertTrue(startUpCalled);
      assertFalse(shutDownCalled);
      shutDownCalled = true;
      if (shutDownException != null) {
        throw shutDownException;
      }
    }

    @Override
    protected ScheduledExecutorService executor() {
      numberOfTimesExecutorCalled.incrementAndGet();
      return executor;
    }

    @Override
    protected Scheduler scheduler() {
      numberOfTimesSchedulerCalled.incrementAndGet();
      return configuration;
    }
  }

  public static class SchedulerTest extends TestCase {
    // These constants are arbitrary and just used to make sure that the correct method is called
    // with the correct parameters.
    private static final int initialDelay = 10;
    private static final int delay = 20;
    private static final TimeUnit unit = TimeUnit.MILLISECONDS;

    // Unique runnable object used for comparison.
    final Runnable testRunnable = new Runnable() {@Override public void run() {}};
    boolean called = false;

    private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
        long delay, TimeUnit unit) {
      assertFalse(called);  // only called once.
      called = true;
      assertEquals(SchedulerTest.initialDelay, initialDelay);
      assertEquals(SchedulerTest.delay, delay);
      assertEquals(SchedulerTest.unit, unit);
      assertEquals(testRunnable, command);
    }

    public void testFixedRateSchedule() {
      Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
      Future<?> unused =
          schedule.schedule(
              null,
              new ScheduledThreadPoolExecutor(1) {
                @Override
                public ScheduledFuture<?> scheduleAtFixedRate(
                    Runnable command, long initialDelay, long period, TimeUnit unit) {
                  assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
                  return null;
                }
              },
              testRunnable);
      assertTrue(called);
    }

    public void testFixedDelaySchedule() {
      Scheduler schedule = newFixedDelaySchedule(initialDelay, delay, unit);
      Future<?> unused =
          schedule.schedule(
              null,
              new ScheduledThreadPoolExecutor(10) {
                @Override
                public ScheduledFuture<?> scheduleWithFixedDelay(
                    Runnable command, long initialDelay, long delay, TimeUnit unit) {
                  assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
                  return null;
                }
              },
              testRunnable);
      assertTrue(called);
    }

    public void testFixedDelayScheduleFarFuturePotentiallyOverflowingScheduleIsNeverReached()
        throws Exception {
      TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
        @Override protected Scheduler scheduler() {
          return newFixedDelaySchedule(Long.MAX_VALUE, Long.MAX_VALUE, SECONDS);
        }
      };
      service.startAsync().awaitRunning();
      try {
        service.firstBarrier.await(5, SECONDS);
        fail();
      } catch (TimeoutException expected) {
      }
      assertEquals(0, service.numIterations.get());
      service.stopAsync();
      service.awaitTerminated();
    }

    public void testCustomSchedulerFarFuturePotentiallyOverflowingScheduleIsNeverReached()
        throws Exception {
      TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
        @Override protected Scheduler scheduler() {
          return new AbstractScheduledService.CustomScheduler() {
            @Override
            protected Schedule getNextSchedule() throws Exception {
              return new Schedule(Long.MAX_VALUE, SECONDS);
            }
          };
        }
      };
      service.startAsync().awaitRunning();
      try {
        service.firstBarrier.await(5, SECONDS);
        fail();
      } catch (TimeoutException expected) {
      }
      assertEquals(0, service.numIterations.get());
      service.stopAsync();
      service.awaitTerminated();
    }

    private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
      public AtomicInteger scheduleCounter = new AtomicInteger(0);
      @Override
      protected Schedule getNextSchedule() throws Exception {
        scheduleCounter.incrementAndGet();
        return new Schedule(0, TimeUnit.SECONDS);
      }
    }

    public void testCustomSchedule_startStop() throws Exception {
      final CyclicBarrier firstBarrier = new CyclicBarrier(2);
      final CyclicBarrier secondBarrier = new CyclicBarrier(2);
      final AtomicBoolean shouldWait = new AtomicBoolean(true);
      Runnable task = new Runnable() {
        @Override public void run() {
          try {
            if (shouldWait.get()) {
              firstBarrier.await();
              secondBarrier.await();
            }
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
        }
      };
      TestCustomScheduler scheduler = new TestCustomScheduler();
      Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
      firstBarrier.await();
      assertEquals(1, scheduler.scheduleCounter.get());
      secondBarrier.await();
      firstBarrier.await();
      assertEquals(2, scheduler.scheduleCounter.get());
      shouldWait.set(false);
      secondBarrier.await();
      future.cancel(false);
    }

    public void testCustomSchedulerServiceStop() throws Exception {
      TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
      service.startAsync().awaitRunning();
      service.firstBarrier.await();
      assertEquals(1, service.numIterations.get());
      service.stopAsync();
      service.secondBarrier.await();
      service.awaitTerminated();
      // Sleep for a while just to ensure that our task wasn't called again.
      Thread.sleep(unit.toMillis(3 * delay));
      assertEquals(1, service.numIterations.get());
    }

    public void testCustomScheduler_deadlock() throws InterruptedException, BrokenBarrierException {
      final CyclicBarrier inGetNextSchedule = new CyclicBarrier(2);
      // This will flakily deadlock, so run it multiple times to increase the flake likelihood
      for (int i = 0; i < 1000; i++) {
        Service service = new AbstractScheduledService() {
          @Override protected void runOneIteration() {}
          @Override protected Scheduler scheduler() {
            return new CustomScheduler() {
              @Override protected Schedule getNextSchedule() throws Exception {
                if (state() != State.STARTING) {
                  inGetNextSchedule.await();
                  Thread.yield();
                  throw new RuntimeException("boom");
                }
                return new Schedule(0, TimeUnit.NANOSECONDS);
              }
            };
          }
        };
        service.startAsync().awaitRunning();
        inGetNextSchedule.await();
        service.stopAsync();
      }
    }

    public void testBig() throws Exception {
      TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
        @Override protected Scheduler scheduler() {
          return new AbstractScheduledService.CustomScheduler() {
            @Override
            protected Schedule getNextSchedule() throws Exception {
              // Explicitly yield to increase the probability of a pathological scheduling.
              Thread.yield();
              return new Schedule(0, TimeUnit.SECONDS);
            }
          };
        }
      };
      service.useBarriers = false;
      service.startAsync().awaitRunning();
      Thread.sleep(50);
      service.useBarriers = true;
      service.firstBarrier.await();
      int numIterations = service.numIterations.get();
      service.stopAsync();
      service.secondBarrier.await();
      service.awaitTerminated();
      assertEquals(numIterations, service.numIterations.get());
    }

    private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
      final AtomicInteger numIterations = new AtomicInteger(0);
      volatile boolean useBarriers = true;
      final CyclicBarrier firstBarrier = new CyclicBarrier(2);
      final CyclicBarrier secondBarrier = new CyclicBarrier(2);

      @Override protected void runOneIteration() throws Exception {
        numIterations.incrementAndGet();
        if (useBarriers) {
          firstBarrier.await();
          secondBarrier.await();
        }
      }

      @Override protected ScheduledExecutorService executor() {
        // use a bunch of threads so that weird overlapping schedules are more likely to happen.
        return Executors.newScheduledThreadPool(10);
      }

      @Override protected Scheduler scheduler() {
        return new CustomScheduler() {
          @Override
          protected Schedule getNextSchedule() throws Exception {
            return new Schedule(delay, unit);
          }};
      }
    }

    public void testCustomSchedulerFailure() throws Exception {
      TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
      service.startAsync().awaitRunning();
      for (int i = 1; i < 4; i++) {
        service.firstBarrier.await();
        assertEquals(i, service.numIterations.get());
        service.secondBarrier.await();
      }
      Thread.sleep(1000);
      try {
        service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
        fail();
      } catch (IllegalStateException e) {
        assertEquals(State.FAILED, service.state());
      }
    }

    private static class TestFailingCustomScheduledService extends AbstractScheduledService {
      final AtomicInteger numIterations = new AtomicInteger(0);
      final CyclicBarrier firstBarrier = new CyclicBarrier(2);
      final CyclicBarrier secondBarrier = new CyclicBarrier(2);

      @Override protected void runOneIteration() throws Exception {
        numIterations.incrementAndGet();
        firstBarrier.await();
        secondBarrier.await();
      }

      @Override protected ScheduledExecutorService executor() {
        // use a bunch of threads so that weird overlapping schedules are more likely to happen.
        return Executors.newScheduledThreadPool(10);
      }

      @Override protected Scheduler scheduler() {
        return new CustomScheduler() {
          @Override
          protected Schedule getNextSchedule() throws Exception {
            if (numIterations.get() > 2) {
              throw new IllegalStateException("Failed");
            }
            return new Schedule(delay, unit);
          }};
      }
    }
  }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java AbstractScheduledServiceTest.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.