alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Axis 2 example source code file (ParallelAsyncTests.java)

This example Axis 2 source code file (ParallelAsyncTests.java) is included in the DevDaily.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Java - Axis 2 tags/keywords

asyncport, asyncport, asyncservice, callbackhandler, exception, exception, executorservice, future, future, message, null, object, string, string, threading, threads, util, xml

The Axis 2 ParallelAsyncTests.java source code

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.axis2.jaxws.sample;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

import javax.xml.ws.BindingProvider;
import javax.xml.ws.Response;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.axis2.jaxws.sample.parallelasync.common.CallbackHandler;
import org.apache.axis2.jaxws.sample.parallelasync.server.AsyncPort;
import org.apache.axis2.jaxws.sample.parallelasync.server.AsyncService;
import org.apache.axis2.jaxws.TestLogger;
import org.apache.log4j.BasicConfigurator;
import org.test.parallelasync.CustomAsyncResponse;
import org.test.parallelasync.SleepResponse;
import org.test.parallelasync.WakeUpResponse;

/**
 * Tests for Asynchrony in JAX-WS. Most of the simple invokeAsync/async
 * exceptions have been covered under jaxws.dispatch and jaxws.proxy test suites
 * 
 * ExecutionException tests are covered in jaxws.dispatch and jaxws.proxy
 */
public class ParallelAsyncTests extends TestCase {

    private static final String DOCLITWR_ASYNC_ENDPOINT =
        "http://localhost:8080/axis2/services/AsyncService";

    // used for logging
    private String myClassName = "ParallelAsyncTests";

    public ParallelAsyncTests(String str) {
        super(str);
    }

    public static Test suite() {
        TestSuite suite = new TestSuite(ParallelAsyncTests.class);
        return suite;
        
    }

    public void setUp() {
        TestLogger.logger.debug("==================== " + getName());
    }
    
    public void testNOOP () {}

    /**
     * @testStrategy Check that the web service is up and running 
     *               before running any other tests
     */
    public void testService_isAlive() throws Exception {
        final String MESSAGE = "testServiceAlive";

        String title = myClassName + " : " + getName() + " : ";

        AsyncPort port = getPort((Executor)null);

        String req1base = "sleepAsync";
        String req2base = "remappedAsync";

        String request1 = null;
        String request2 = null;

        for (int i = 0; i < 10; i++) {
            
            request1 = req1base + "_" + i;
            request2 = req2base + "_" + i;

            TestLogger.logger.debug(title + "iteration [" + i + "] using request1 [" + request1 +
                    "]  request2 [" + request2 + "]");

            // submit request #1 to the server-side web service that 
            // the web service will keep until we ask for it
            Response<SleepResponse> resp1 = port.sleepAsync(request1);

            // submit request #2 to the server that essentially processes
            // without delay
            Response<CustomAsyncResponse> resp2 = port.remappedAsync(request2);

            // wait until the response for request #2 is done 
            waitBlocking(resp2);

            // check the waiting request #1
            String asleep = port.isAsleep(request1);
            //System.out.println(title+"iteration ["+i+"]   port.isAsleep(request1 ["+request1+"]) = ["+asleep+"]");

            // wakeup the waiting request #1
            String wake = port.wakeUp(request1);
            //System.out.println(title+"iteration ["+i+"]   port.wakeUp(request1 ["+request1+"]) = ["+wake+"]");

            // wait until the response for request #1 is done
            waitBlocking(resp1);
        
            // get the responses
            String req1_result = null;
            String req2_result = null;

            try {
                req1_result = resp1.get().getMessage();
                req2_result = resp2.get().getResponse();
            } catch (Exception e) {
                TestLogger.logger.debug(
                        title + "iteration [" + i + "] using request1 [" + request1 +
                                "]  request2 [" + request2 + "] :  got exception [" +
                                e.getClass().getName() + "]  [" + e.getMessage() + "] ");
                e.printStackTrace();
                fail(e.toString());
            }

            // check status on request #1
            assertEquals("sleepAsync did not sleep as expected", request1, asleep);
            assertEquals("sleepAsync did not return expected response ", request1, req1_result);

            // check status on request #2
            assertEquals("remappedAsync did not return expected response", request2, req2_result);
            

            // Calling get() again should return the same object as the first call to get()
            assertEquals("sleepAsync did not return expected response ", request1, resp1.get().getMessage());
            assertEquals("remappedAsync did not return expected response", request2, resp2.get().getResponse());
            
        }
        
        // check the callback operation
		CallbackHandler<SleepResponse> sleepCallbackHandler = new CallbackHandler();

        request1 = req1base + "_with_Callback";
        //System.out.println(title+" port.sleepAsync("+request1+", callbackHander)  being submitted....");
		Future<?> sr = port.sleepAsync(request1, sleepCallbackHandler);

        // wait a bit for the server to process the request ...
        Thread.sleep(500);

        // check the waiting request 
        String asleepWithCallback = port.isAsleep(request1);
        //System.out.println(title+" port.isAsleep("+request1+") = ["+asleepWithCallback+"]");

        // wakeup the waiting request
        String wake = port.wakeUp(request1);
        //System.out.println(title+" port.wakeUp("+request1+") = ["+wake+"]");

        // wait a bit..
        Thread.sleep(500);

        // get the response
        String req_cb_result = null;

        try {

            SleepResponse sleepResp = sleepCallbackHandler.get();

            if (sleepResp != null)
            {
                req_cb_result = sleepResp.getMessage();
                TestLogger.logger.debug(
                        title + " request [" + request1 + "] :  result [" + req_cb_result + "] ");
            }

        } catch (Exception ex) {
            TestLogger.logger.debug(title + " request [" + request1 + "] :  got exception [" +
                    ex.getClass().getName() + "]  [" + ex.getMessage() + "] ");
            ex.printStackTrace();
            fail(ex.toString());
        }

        // check status on request
        assertEquals("sleepAsync with callback did not sleep as expected", request1, req_cb_result);

    }


    /**
     * @testStrategy Test for ordering an executor to shutdownNow while there
     *               is a request being processed.  Uses the default executor.
     *               
     */
    public void testService_ExecutorShutdownNow() throws Exception {
        final String MESSAGE = "testExecutorShutdownNow";

        String title = myClassName + " : " + getName() + " : ";

        AsyncService service = getService(null);
        AsyncPort port = getPort(service);

		// get the default executor and check to make sure it is an executor service
        ExecutorService ex = null;
        Executor executor = service.getExecutor();
        if ((executor != null) && (executor instanceof ExecutorService))
        {
            ex = (ExecutorService) executor;
        }
        else
        {
            TestLogger.logger.debug(title + " No executor service available. Nothing to test.");
            return;
        }


        // submit a request to the server that will wait until we ask for it
		CallbackHandler<SleepResponse> sleepCallbackHandler1 = new CallbackHandler();

        String request1 = "sleepAsync_with_Callback_1";

        TestLogger.logger.debug(title + " port.sleepAsync(" + request1 +
                ", callbackHander1)  #1 being submitted....");
		Future<?> sr1 = port.sleepAsync(request1, sleepCallbackHandler1);
        TestLogger.logger.debug(
                title + " port.sleepAsync(" + request1 + ", callbackHander1)  #1 .....submitted.");

        // wait a bit to make sure that the server has the request
        Thread.sleep(1000);

		// tell the executor to shutdown immediately, which 
        // attempts to stop all actively executing tasks via Thread.interrupt()
        // and should prevent new tasks from being submitted
        TestLogger.logger
                .debug(title + " shutting down executor [" + ex.getClass().getName() + "]");
        ex.shutdownNow();

        // check the waiting request 
        TestLogger.logger.debug(title + " port.isAsleep(" + request1 + ") #1 being submitted....");
        String asleepWithCallback1 = port.isAsleep(request1);
        TestLogger.logger.debug(
                title + " port.isAsleep(" + request1 + ") #1 = [" + asleepWithCallback1 + "]");

        // wakeup the waiting request
        TestLogger.logger.debug(title + " port.wakeUp(request1) #1 being submitted....");
        String wake1 = port.wakeUp(request1);
        TestLogger.logger.debug(title + " port.wakeUp(" + request1 + ") #1 = [" + wake1 + "]");

        // wait a bit..
        Thread.sleep(2000);

        // check the Future
        if (sr1.isDone())
        {
            TestLogger.logger.debug(title + " sr1.isDone[TRUE] ");
        }

        // try to get the response
        boolean gotException = false;
        try {

            SleepResponse sleepResp1 = sleepCallbackHandler1.get();

            if (sleepResp1 != null)
            {
                TestLogger.logger.debug(title + " request [" + request1 +
                        "] #1:  sleepResponse [NOT NULL] from callback handler");
                String result1 = sleepResp1.getMessage();
                TestLogger.logger.debug(
                        title + " request [" + request1 + "] #1:  result [" + result1 + "] ");
            }
            else
            {
                TestLogger.logger.debug(title + " request [" + request1 +
                        "] #1:  sleepResponse [NULL] from callback handler");

                // see what the Future says
                TestLogger.logger.debug(
                        title + " request [" + request1 + "] #1:  ....check Future response...");
                Object futureResult = sr1.get();
                TestLogger.logger.debug(
                        title + " request [" + request1 + "] #1:  ....Future response [" +
                                futureResult + "]...");
            }

        } catch (Exception exc) {

            TestLogger.logger.debug(title + " request [" + request1 + "] :  got exception [" +
                    exc.getClass().getName() + "]  [" + exc.getMessage() + "] ");
            gotException = true;
        }

        assertTrue("Did not receive an exception from trying to access the response when the executor service is shutdown.",gotException);
    }


    /**
     * @testStrategy Test for ordering an executor to shutdownNow while there
     *               is a request being processed.  Uses an application executor
     *               service.
     */
    public void testService_ExecutorShutdownNow_2() throws Exception {
        final String MESSAGE = "testExecutorShutdownNow_2";

        String title = myClassName + " : " + getName() + " : ";

        AsyncService service = getService(null);
        AsyncPort port = getPort(service);

		// get the default executor and check to make sure it is an executor service
		ExecutorService ex = Executors.newSingleThreadExecutor();
		service.setExecutor(ex);


        // submit a request to the server that will wait until we ask for it
		CallbackHandler<SleepResponse> sleepCallbackHandler1 = new CallbackHandler();

        String request1 = "sleepAsync_with_Callback_1";

        TestLogger.logger.debug(title + " port.sleepAsync(" + request1 +
                ", callbackHander1)  #1 being submitted....");
		Future<?> sr1 = port.sleepAsync(request1, sleepCallbackHandler1);
        TestLogger.logger.debug(
                title + " port.sleepAsync(" + request1 + ", callbackHander1)  #1 .....submitted.");

        // wait a bit to make sure that the server has the request
        Thread.sleep(1000);

		// tell the executor to shutdown immediately, which 
        // attempts to stop all actively executing tasks via Thread.interrupt()
        // and should prevent new tasks from being submitted
        TestLogger.logger
                .debug(title + " shutting down executor [" + ex.getClass().getName() + "]");
        ex.shutdownNow();

        // check the waiting request 
        TestLogger.logger.debug(title + " port.isAsleep(" + request1 + ") #1 being submitted....");
        String asleepWithCallback1 = port.isAsleep(request1);
        TestLogger.logger.debug(
                title + " port.isAsleep(" + request1 + ") #1 = [" + asleepWithCallback1 + "]");

        // wakeup the waiting request
        TestLogger.logger.debug(title + " port.wakeUp(request1) #1 being submitted....");
        String wake1 = port.wakeUp(request1);
        TestLogger.logger.debug(title + " port.wakeUp(" + request1 + ") #1 = [" + wake1 + "]");

        // wait a bit..
        Thread.sleep(2000);

        // check the Future
        if (sr1.isDone())
        {
            TestLogger.logger.debug(title + " sr1.isDone[TRUE] ");
        }

        // try to get the response
        boolean gotException = false;
        try {

            SleepResponse sleepResp1 = sleepCallbackHandler1.get();

            if (sleepResp1 != null)
            {
                TestLogger.logger.debug(title + " request [" + request1 +
                        "] #1:  sleepResponse [NOT NULL] from callback handler");
                String result1 = sleepResp1.getMessage();
                TestLogger.logger.debug(
                        title + " request [" + request1 + "] #1:  result [" + result1 + "] ");
            }
            else
            {
                TestLogger.logger.debug(title + " request [" + request1 +
                        "] #1:  sleepResponse [NULL] from callback handler");

                // see what the Future says
                TestLogger.logger.debug(
                        title + " request [" + request1 + "] #1:  ....check Future response...");
                Object futureResult = sr1.get();
                TestLogger.logger.debug(
                        title + " request [" + request1 + "] #1:  ....Future response [" +
                                futureResult + "]...");
            }

        } catch (Exception exc) {

            TestLogger.logger.debug(title + " request [" + request1 + "] :  got exception [" +
                    exc.getClass().getName() + "]  [" + exc.getMessage() + "] ");
            gotException = true;
        }

        assertTrue("Did not receive an exception from trying to access the response when the executor service is shutdown.",gotException);
    }

    /**
     * @testStrategy Test for ordering an executor to shutdownNow before there
     *               is a request.  Uses the default executor.
     *               
     */
    public void testService_ExecutorShutdownNow_3() throws Exception {
        final String MESSAGE = "testExecutorShutdownNow_3";

        String title = myClassName + " : " + getName() + " : ";

        AsyncService service = getService(null);
        AsyncPort port = getPort(service);

		// get the default executor and check to make sure it is an executor service
        ExecutorService ex = null;
        Executor executor = service.getExecutor();
        if ((executor != null) && (executor instanceof ExecutorService))
        {
            ex = (ExecutorService) executor;

            // tell the executor to shutdown immediately, which 
            // attempts to stop all actively executing tasks via Thread.interrupt()
            // and should prevent new tasks from being submitted
            TestLogger.logger
                    .debug(title + " shutting down executor [" + ex.getClass().getName() + "]");
            ex.shutdownNow();
        }
        else
        {
            TestLogger.logger.debug(title + " No executor service available. Nothing to test.");
            return;
        }


        boolean gotRequestException = false;

        String request1 = "sleepAsync_with_Callback_1";
        CallbackHandler<SleepResponse> sleepCallbackHandler1 = new CallbackHandler();
        Future<?> sr1 = null;

        try
        {
            // submit a request to the server that will wait until we ask for it
            TestLogger.logger.debug(title + " port.sleepAsync(" + request1 +
                    ", callbackHander1)  #1 being submitted....");
            sr1 = port.sleepAsync(request1, sleepCallbackHandler1);
            TestLogger.logger.debug(title + " port.sleepAsync(" + request1 +
                    ", callbackHander1)  #1 .....submitted.");
        }
        catch (Exception exc)
        {
            TestLogger.logger.debug(title + " request [" + request1 + "] :  got exception [" +
                    exc.getClass().getName() + "]  [" + exc.getMessage() + "] ");
            gotRequestException = true;
        }

        // if the request went through, continue processing to see if the response is stopped
        // this makes sure that the server doesn't keep the request forever
        boolean gotResponseException = false;

        if (!gotRequestException)
        {
            // wakeup the waiting request
            TestLogger.logger.debug(title + " port.wakeUp(request1) #1 being submitted....");
            String wake1 = port.wakeUp(request1);
            TestLogger.logger.debug(title + " port.wakeUp(" + request1 + ") #1 = [" + wake1 + "]");

            // try to get the response
            try {

                SleepResponse sleepResp1 = sleepCallbackHandler1.get();

                if (sleepResp1 != null)
                {
                    TestLogger.logger.debug(title + " request [" + request1 +
                            "] #1:  sleepResponse [NOT NULL] from callback handler");
                    String result1 = sleepResp1.getMessage();
                    TestLogger.logger.debug(
                            title + " request [" + request1 + "] #1:  result [" + result1 + "] ");
                }
                else
                {
                    TestLogger.logger.debug(title + " request [" + request1 +
                            "] #1:  sleepResponse [NULL] from callback handler");

                    // see what the Future says
                    TestLogger.logger.debug(title + " request [" + request1 +
                            "] #1:  ....check Future response...");
                    Object futureResult = sr1.get();
                    TestLogger.logger.debug(title + " request [" + request1 +
                            "] #1:  ....Future response [" + futureResult + "]...");
                }

            } catch (Exception exc) {

                TestLogger.logger.debug(title + " request [" + request1 + "] :  got exception [" +
                        exc.getClass().getName() + "]  [" + exc.getMessage() + "] ");
                gotResponseException = true;
            }
        }

        assertTrue("Did not receive an exception from trying to submit the request when the executor service is shutdown.",gotRequestException);

        //assertTrue("Did not receive an exception from trying to access the response when the executor service is shutdown.",gotResponseException);
    }




    /**
     * Auxiliary method used for doing isAsleep checks. Will perform isAsleep
     * up to a MAX_ISASLEEP_CHECK number of checks. Will sleep for
     * SLEEP_ISASLEEP_SEC seconds in between requests. If reaches maximum number
     * fo retries then will fail the test
     */
    private boolean isAsleepCheck(String MESSAGE, AsyncPort port) {
        boolean asleep = false;
        int check = 30;
        String msg = null;
        do {
            msg = port.isAsleep(MESSAGE);
            asleep = (msg != null);

            // fail the test if we ran out of checks
            if ((check--) == 0)
                fail("Serve did not receive sleep after several retries");

            // sleep for a bit
            try {
                Thread.sleep(30);
            } 
            catch (InterruptedException e) {
            }

        } while (!asleep);

        if (asleep) {
            assertTrue("Sleeping on an incorrect message", MESSAGE.equals(msg));
        }

        return true;
    }
    

    private AsyncService getService(Executor ex) {
        AsyncService service = new AsyncService();

        if (ex!= null)
            service.setExecutor(ex);
        
        if (service.getExecutor() == null)
        {
            TestLogger.logger.debug(myClassName + " : getService() : executor is null");
        }
        else
        {
            TestLogger.logger.debug(myClassName + " : getService() : executor is available ");
        }

        return service;
    }


    private AsyncPort getPort(AsyncService service) {

        AsyncPort port = service.getAsyncPort();
        assertNotNull("Port is null", port);

        Map<String, Object> rc = ((BindingProvider) port).getRequestContext();
        rc.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
                DOCLITWR_ASYNC_ENDPOINT);
        
        return port;

    }

    /**
     * Auxiliary method used for obtaining a proxy pre-configured with a
     * specific Executor
     */
    private AsyncPort getPort(Executor ex) {
        AsyncService service = getService(ex);

        AsyncPort port = service.getAsyncPort();
        assertNotNull("Port is null", port);

        Map<String, Object> rc = ((BindingProvider) port).getRequestContext();
        rc.put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY,
                DOCLITWR_ASYNC_ENDPOINT);
        
        return port;
    }
    
    private void waitBlocking(Future<?> monitor){
        while (!monitor.isDone()){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
    }
}

Other Axis 2 examples (source code examples)

Here is a short list of links related to this Axis 2 ParallelAsyncTests.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.