|
Java example source code file (UninterruptiblesTest.java)
This example Java source code file (UninterruptiblesTest.java) is included in the alvinalexander.com
"Java Source Code
Warehouse" project. The intent of this project is to help you "Learn
Java by Example" TM.
Learn more about this Java project at its project page.
The UninterruptiblesTest.java Java example source code
/*
* Copyright (C) 2011 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.util.concurrent.InterruptionUtil.repeatedlyInterruptTestThread;
import static com.google.common.util.concurrent.Uninterruptibles.joinUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.tryAcquireUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.testing.NullPointerTester;
import com.google.common.testing.TearDown;
import com.google.common.testing.TearDownStack;
import junit.framework.TestCase;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
/**
* Tests for {@link Uninterruptibles}.
*
* @author Anthony Zana
*/
public class UninterruptiblesTest extends TestCase {
private static final String EXPECTED_TAKE = "expectedTake";
/**
* Timeout to use when we don't expect the timeout to expire.
*/
private static final long LONG_DELAY_MS = 2500;
private static final long SLEEP_SLACK = 2;
private final TearDownStack tearDownStack = new TearDownStack();
// NOTE: All durations in these tests are expressed in milliseconds
@Override
protected void setUp() {
// Clear any previous interrupt before running the test.
if (Thread.currentThread().isInterrupted()) {
throw new AssertionError("Thread interrupted on test entry. "
+ "Some test probably didn't clear the interrupt state");
}
tearDownStack.addTearDown(new TearDown() {
@Override
public void tearDown() {
Thread.interrupted();
}
});
}
@Override
protected void tearDown() {
tearDownStack.runTearDown();
}
public void testNull() throws Exception {
new NullPointerTester()
.setDefault(CountDownLatch.class, new CountDownLatch(0))
.setDefault(Semaphore.class, new Semaphore(999))
.testAllPublicStaticMethods(Uninterruptibles.class);
}
// IncrementableCountDownLatch.await() tests
// CountDownLatch.await() tests
// BlockingQueue.put() tests
public void testPutWithNoWait() {
Stopwatch stopwatch = Stopwatch.createStarted();
BlockingQueue<String> queue = new ArrayBlockingQueue(999);
putUninterruptibly(queue, "");
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
assertEquals("", queue.peek());
}
public void testPutNoInterrupt() {
TimedPutQueue queue = TimedPutQueue.createWithDelay(20);
queue.putSuccessfully();
assertNotInterrupted();
}
public void testPutSingleInterrupt() {
TimedPutQueue queue = TimedPutQueue.createWithDelay(50);
requestInterruptIn(10);
queue.putSuccessfully();
assertInterrupted();
}
public void testPutMultiInterrupt() {
TimedPutQueue queue = TimedPutQueue.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
queue.putSuccessfully();
assertInterrupted();
}
// BlockingQueue.take() tests
public void testTakeWithNoWait() {
Stopwatch stopwatch = Stopwatch.createStarted();
BlockingQueue<String> queue = new ArrayBlockingQueue(1);
assertTrue(queue.offer(""));
assertEquals("", takeUninterruptibly(queue));
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testTakeNoInterrupt() {
TimedTakeQueue queue = TimedTakeQueue.createWithDelay(20);
queue.takeSuccessfully();
assertNotInterrupted();
}
public void testTakeSingleInterrupt() {
TimedTakeQueue queue = TimedTakeQueue.createWithDelay(50);
requestInterruptIn(10);
queue.takeSuccessfully();
assertInterrupted();
}
public void testTakeMultiInterrupt() {
TimedTakeQueue queue = TimedTakeQueue.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
queue.takeSuccessfully();
assertInterrupted();
}
// join() tests
public void testJoinWithNoWait() throws InterruptedException {
Stopwatch stopwatch = Stopwatch.createStarted();
Thread thread = new Thread(new JoinTarget(15));
thread.start();
thread.join();
assertFalse(thread.isAlive());
joinUninterruptibly(thread);
joinUninterruptibly(thread, 0, MILLISECONDS);
joinUninterruptibly(thread, -42, MILLISECONDS);
joinUninterruptibly(thread, LONG_DELAY_MS, MILLISECONDS);
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testJoinNoInterrupt() {
TimedThread thread = TimedThread.createWithDelay(20);
thread.joinSuccessfully();
assertNotInterrupted();
}
public void testJoinTimeoutNoInterruptNotExpired() {
TimedThread thread = TimedThread.createWithDelay(20);
thread.joinSuccessfully(LONG_DELAY_MS);
assertNotInterrupted();
}
public void testJoinTimeoutNoInterruptExpired() {
TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
thread.joinUnsuccessfully(30);
assertNotInterrupted();
}
public void testJoinSingleInterrupt() {
TimedThread thread = TimedThread.createWithDelay(50);
requestInterruptIn(10);
thread.joinSuccessfully();
assertInterrupted();
}
public void testJoinTimeoutSingleInterruptNoExpire() {
TimedThread thread = TimedThread.createWithDelay(50);
requestInterruptIn(10);
thread.joinSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testJoinTimeoutSingleInterruptExpired() {
TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
requestInterruptIn(10);
thread.joinUnsuccessfully(50);
assertInterrupted();
}
public void testJoinMultiInterrupt() {
TimedThread thread = TimedThread.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
thread.joinSuccessfully();
assertInterrupted();
}
public void testJoinTimeoutMultiInterruptNoExpire() {
TimedThread thread = TimedThread.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
thread.joinSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testJoinTimeoutMultiInterruptExpired() {
/*
* We don't "need" to schedule a thread completion at all here, but by doing
* so, we come the closest we can to testing that the wait time is
* appropriately decreased on each progressive join() call.
*/
TimedThread thread = TimedThread.createWithDelay(LONG_DELAY_MS);
repeatedlyInterruptTestThread(20, tearDownStack);
thread.joinUnsuccessfully(70);
assertInterrupted();
}
// sleep() Tests
public void testSleepNoInterrupt() {
sleepSuccessfully(10);
}
public void testSleepSingleInterrupt() {
requestInterruptIn(10);
sleepSuccessfully(50);
assertInterrupted();
}
public void testSleepMultiInterrupt() {
repeatedlyInterruptTestThread(10, tearDownStack);
sleepSuccessfully(100);
assertInterrupted();
}
// Semaphore.tryAcquire() tests
public void testTryAcquireWithNoWait() {
Stopwatch stopwatch = Stopwatch.createStarted();
Semaphore semaphore = new Semaphore(99);
assertTrue(tryAcquireUninterruptibly(semaphore, 0, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, -42, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, LONG_DELAY_MS, MILLISECONDS));
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testTryAcquireTimeoutNoInterruptNotExpired() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
assertNotInterrupted();
}
public void testTryAcquireTimeoutNoInterruptExpired() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
semaphore.tryAcquireUnsuccessfully(30);
assertNotInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptNoExpire() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
requestInterruptIn(10);
semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptExpired() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
requestInterruptIn(10);
semaphore.tryAcquireUnsuccessfully(50);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptNoExpire() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireSuccessfully(LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptExpired() {
/*
* We don't "need" to schedule a release() call at all here, but by doing
* so, we come the closest we can to testing that the wait time is
* appropriately decreased on each progressive tryAcquire() call.
*/
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireUnsuccessfully(70);
assertInterrupted();
}
public void testTryAcquireWithNoWaitMultiPermit() {
Stopwatch stopwatch = Stopwatch.createStarted();
Semaphore semaphore = new Semaphore(99);
assertTrue(tryAcquireUninterruptibly(semaphore, 10, 0, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, 10, -42, MILLISECONDS));
assertTrue(tryAcquireUninterruptibly(semaphore, 10, LONG_DELAY_MS, MILLISECONDS));
assertTimeNotPassed(stopwatch, LONG_DELAY_MS);
}
public void testTryAcquireTimeoutNoInterruptNotExpiredMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(20);
semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
assertNotInterrupted();
}
public void testTryAcquireTimeoutNoInterruptExpiredMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
semaphore.tryAcquireUnsuccessfully(10, 30);
assertNotInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptNoExpireMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(50);
requestInterruptIn(10);
semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutSingleInterruptExpiredMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
requestInterruptIn(10);
semaphore.tryAcquireUnsuccessfully(10, 50);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptNoExpireMultiPermit() {
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(100);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireSuccessfully(10, LONG_DELAY_MS);
assertInterrupted();
}
public void testTryAcquireTimeoutMultiInterruptExpiredMultiPermit() {
/*
* We don't "need" to schedule a release() call at all here, but by doing
* so, we come the closest we can to testing that the wait time is
* appropriately decreased on each progressive tryAcquire() call.
*/
TimedSemaphore semaphore = TimedSemaphore.createWithDelay(LONG_DELAY_MS);
repeatedlyInterruptTestThread(20, tearDownStack);
semaphore.tryAcquireUnsuccessfully(10, 70);
assertInterrupted();
}
/**
* Wrapper around {@link Stopwatch} which also contains an
* "expected completion time." Creating a {@code Completion} starts the
* underlying stopwatch.
*/
private static final class Completion {
final Stopwatch stopwatch;
final long expectedCompletionWaitMillis;
Completion(long expectedCompletionWaitMillis) {
this.expectedCompletionWaitMillis = expectedCompletionWaitMillis;
stopwatch = Stopwatch.createStarted();
}
/**
* Asserts that the expected completion time has passed (and not "too much"
* time beyond that).
*/
void assertCompletionExpected() {
assertAtLeastTimePassed(stopwatch, expectedCompletionWaitMillis);
assertTimeNotPassed(stopwatch,
expectedCompletionWaitMillis + LONG_DELAY_MS);
}
/**
* Asserts that at least {@code timeout} has passed but the expected
* completion time has not.
*/
void assertCompletionNotExpected(long timeout) {
Preconditions.checkArgument(timeout < expectedCompletionWaitMillis);
assertAtLeastTimePassed(stopwatch, timeout);
assertTimeNotPassed(stopwatch, expectedCompletionWaitMillis);
}
private static void assertAtLeastTimePassed(
Stopwatch stopwatch, long expectedMillis) {
long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
/*
* The "+ 5" below is to permit, say, sleep(10) to sleep only 9 milliseconds. We see such
* behavior sometimes when running these tests publicly as part of Guava. "+ 5" is probably
* more generous than it needs to be.
*/
assertTrue("Expected elapsed millis to be >= " + expectedMillis
+ " but was " + elapsedMillis, elapsedMillis + 5 >= expectedMillis);
}
}
// TODO(cpovirk): Split this into separate CountDownLatch and IncrementableCountDownLatch classes.
/**
* Manages a {@link BlockingQueue} and associated timings for a {@code put}
* call.
*/
private static final class TimedPutQueue {
final BlockingQueue<String> queue;
final Completion completed;
/**
* Creates a {@link EnableWrites} which open up a spot for a {@code put} to
* succeed in {@code countdownInMillis}.
*/
static TimedPutQueue createWithDelay(long countdownInMillis) {
return new TimedPutQueue(countdownInMillis);
}
private TimedPutQueue(long countdownInMillis) {
this.queue = new ArrayBlockingQueue<String>(1);
assertTrue(queue.offer("blocksPutCallsUntilRemoved"));
this.completed = new Completion(countdownInMillis);
scheduleEnableWrites(this.queue, countdownInMillis);
}
/**
* Perform a {@code put} and assert that operation completed in the expected
* timeframe.
*/
void putSuccessfully() {
putUninterruptibly(queue, "");
completed.assertCompletionExpected();
assertEquals("", queue.peek());
}
private static void scheduleEnableWrites(
BlockingQueue<String> queue, long countdownInMillis) {
Runnable toRun = new EnableWrites(queue, countdownInMillis);
// TODO(cpovirk): automatically fail the test if this thread throws
Thread enablerThread = new Thread(toRun);
enablerThread.start();
}
}
/**
* Manages a {@link BlockingQueue} and associated timings for a {@code take}
* call.
*/
private static final class TimedTakeQueue {
final BlockingQueue<String> queue;
final Completion completed;
/**
* Creates a {@link EnableReads} which insert an element for a {@code take}
* to receive in {@code countdownInMillis}.
*/
static TimedTakeQueue createWithDelay(long countdownInMillis) {
return new TimedTakeQueue(countdownInMillis);
}
private TimedTakeQueue(long countdownInMillis) {
this.queue = new ArrayBlockingQueue<String>(1);
this.completed = new Completion(countdownInMillis);
scheduleEnableReads(this.queue, countdownInMillis);
}
/**
* Perform a {@code take} and assert that operation completed in the
* expected timeframe.
*/
void takeSuccessfully() {
assertEquals(EXPECTED_TAKE, takeUninterruptibly(queue));
completed.assertCompletionExpected();
assertTrue(queue.isEmpty());
}
private static void scheduleEnableReads(
BlockingQueue<String> queue, long countdownInMillis) {
Runnable toRun = new EnableReads(queue, countdownInMillis);
// TODO(cpovirk): automatically fail the test if this thread throws
Thread enablerThread = new Thread(toRun);
enablerThread.start();
}
}
/**
* Manages a {@link Semaphore} and associated timings.
*/
private static final class TimedSemaphore {
final Semaphore semaphore;
final Completion completed;
/**
* Create a {@link Release} which will release a semaphore permit in
* {@code countdownInMillis}.
*/
static TimedSemaphore createWithDelay(long countdownInMillis) {
return new TimedSemaphore(countdownInMillis);
}
private TimedSemaphore(long countdownInMillis) {
this.semaphore = new Semaphore(0);
this.completed = new Completion(countdownInMillis);
scheduleRelease(countdownInMillis);
}
/**
* Requests a permit from the semaphore with a timeout and asserts that operation completed in
* the expected timeframe.
*/
void tryAcquireSuccessfully(long timeoutMillis) {
assertTrue(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
completed.assertCompletionExpected();
}
/**
* Requests a permit from the semaphore with a timeout and asserts that the wait returned
* within the expected timeout.
*/
private void tryAcquireUnsuccessfully(long timeoutMillis) {
assertFalse(tryAcquireUninterruptibly(semaphore, timeoutMillis, MILLISECONDS));
completed.assertCompletionNotExpected(timeoutMillis);
}
void tryAcquireSuccessfully(int permits, long timeoutMillis) {
assertTrue(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
completed.assertCompletionExpected();
}
private void tryAcquireUnsuccessfully(int permits, long timeoutMillis) {
assertFalse(tryAcquireUninterruptibly(semaphore, permits, timeoutMillis, MILLISECONDS));
completed.assertCompletionNotExpected(timeoutMillis);
}
private void scheduleRelease(long countdownInMillis) {
DelayedActionRunnable toRun = new Release(semaphore, countdownInMillis);
// TODO(cpovirk): automatically fail the test if this thread throws
Thread releaserThread = new Thread(toRun);
releaserThread.start();
}
}
private abstract static class DelayedActionRunnable implements Runnable {
private final long tMinus;
protected DelayedActionRunnable(long tMinus) {
this.tMinus = tMinus;
}
@Override
public final void run() {
try {
Thread.sleep(tMinus);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
doAction();
}
protected abstract void doAction();
}
private static class CountDown extends DelayedActionRunnable {
private final CountDownLatch latch;
public CountDown(CountDownLatch latch, long tMinus) {
super(tMinus);
this.latch = latch;
}
@Override
protected void doAction() {
latch.countDown();
}
}
private static class EnableWrites extends DelayedActionRunnable {
private final BlockingQueue<String> queue;
public EnableWrites(BlockingQueue<String> queue, long tMinus) {
super(tMinus);
assertFalse(queue.isEmpty());
assertFalse(queue.offer("shouldBeRejected"));
this.queue = queue;
}
@Override
protected void doAction() {
assertNotNull(queue.remove());
}
}
private static class EnableReads extends DelayedActionRunnable {
private final BlockingQueue<String> queue;
public EnableReads(BlockingQueue<String> queue, long tMinus) {
super(tMinus);
assertTrue(queue.isEmpty());
this.queue = queue;
}
@Override
protected void doAction() {
assertTrue(queue.offer(EXPECTED_TAKE));
}
}
private static final class TimedThread {
private final Thread thread;
private final Completion completed;
static TimedThread createWithDelay(long countdownInMillis) {
return new TimedThread(countdownInMillis);
}
private TimedThread(long expectedCompletionWaitMillis) {
completed = new Completion(expectedCompletionWaitMillis);
thread = new Thread(new JoinTarget(expectedCompletionWaitMillis));
thread.start();
}
void joinSuccessfully() {
Uninterruptibles.joinUninterruptibly(thread);
completed.assertCompletionExpected();
assertEquals(Thread.State.TERMINATED, thread.getState());
}
void joinSuccessfully(long timeoutMillis) {
Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
completed.assertCompletionExpected();
assertEquals(Thread.State.TERMINATED, thread.getState());
}
void joinUnsuccessfully(long timeoutMillis) {
Uninterruptibles.joinUninterruptibly(thread, timeoutMillis, MILLISECONDS);
completed.assertCompletionNotExpected(timeoutMillis);
assertFalse(Thread.State.TERMINATED.equals(thread.getState()));
}
}
private static class JoinTarget extends DelayedActionRunnable {
public JoinTarget(long tMinus) {
super(tMinus);
}
@Override
protected void doAction() {
}
}
private static class Release extends DelayedActionRunnable {
private final Semaphore semaphore;
public Release(Semaphore semaphore, long tMinus) {
super(tMinus);
this.semaphore = semaphore;
}
@Override
protected void doAction() {
semaphore.release(10);
}
}
private static void sleepSuccessfully(long sleepMillis) {
Completion completed = new Completion(sleepMillis - SLEEP_SLACK);
Uninterruptibles.sleepUninterruptibly(sleepMillis, MILLISECONDS);
completed.assertCompletionExpected();
}
private static void assertTimeNotPassed(Stopwatch stopwatch,
long timelimitMillis) {
long elapsedMillis = stopwatch.elapsed(MILLISECONDS);
assertTrue(elapsedMillis < timelimitMillis);
}
/**
* Await an interrupt, then clear the interrupt status. Similar to
* {@code assertTrue(Thread.interrupted())} except that this version tolerates
* late interrupts.
*/
private static void assertInterrupted() {
try {
/*
* The sleep() will end immediately if we've already been interrupted or
* wait patiently for the interrupt if not.
*/
Thread.sleep(LONG_DELAY_MS);
fail("Dude, where's my interrupt?");
} catch (InterruptedException expected) {
}
}
private static void assertNotInterrupted() {
assertFalse(Thread.interrupted());
}
private static void requestInterruptIn(long millis) {
InterruptionUtil.requestInterruptIn(millis, MILLISECONDS);
}
}
Other Java examples (source code examples)
Here is a short list of links related to this Java UninterruptiblesTest.java source code file:
|