alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (LocalTransportThreadModelTest3.java)

This example Java source code file (LocalTransportThreadModelTest3.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

defaulteventexecutorgroup, defaultthreadfactory, eventexecutorgroup, eventforwarder, eventrecorder, eventtype, exception, ignore, linkedlist, object, override, random, test, threading, threads, throwable, util

The LocalTransportThreadModelTest3.java Java example source code

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.channel.local;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Deque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;

public class LocalTransportThreadModelTest3 {

    enum EventType {
        EXCEPTION_CAUGHT,
        USER_EVENT,
        MESSAGE_RECEIVED_LAST,
        INACTIVE,
        ACTIVE,
        UNREGISTERED,
        REGISTERED,
        MESSAGE_RECEIVED,
        WRITE,
        READ
    }

    private static EventLoopGroup group;
    private static LocalAddress localAddr;

    @BeforeClass
    public static void init() {
        // Configure a test server
        group = new DefaultEventLoopGroup();
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(group)
                .channel(LocalServerChannel.class)
                .childHandler(new ChannelInitializer<LocalChannel>() {
                    @Override
                    public void initChannel(LocalChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                // Discard
                                ReferenceCountUtil.release(msg);
                            }
                        });
                    }
                });

        localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
    }

    @AfterClass
    public static void destroy() throws Exception {
        group.shutdownGracefully().sync();
    }

    @Test(timeout = 60000)
    @Ignore("regression test")
    public void testConcurrentAddRemoveInboundEventsMultiple() throws Throwable {
        for (int i = 0; i < 50; i ++) {
            testConcurrentAddRemoveInboundEvents();
        }
    }

    @Test(timeout = 60000)
    @Ignore("regression test")
    public void testConcurrentAddRemoveOutboundEventsMultiple() throws Throwable {
        for (int i = 0; i < 50; i ++) {
            testConcurrentAddRemoveOutboundEvents();
        }
    }

    @Test(timeout = 30000)
    @Ignore("needs a fix")
    public void testConcurrentAddRemoveInboundEvents() throws Throwable {
        testConcurrentAddRemove(true);
    }

    @Test(timeout = 30000)
    @Ignore("needs a fix")
    public void testConcurrentAddRemoveOutboundEvents() throws Throwable {
        testConcurrentAddRemove(false);
    }

    private static void testConcurrentAddRemove(boolean inbound) throws Exception {
        EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
        EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
        EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
        EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
        EventExecutorGroup e4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
        EventExecutorGroup e5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));

        final EventExecutorGroup[] groups = {e1, e2, e3, e4, e5};
        try {
            Deque<EventType> events = new ConcurrentLinkedDeque();
            final EventForwarder h1 = new EventForwarder();
            final EventForwarder h2 = new EventForwarder();
            final EventForwarder h3 = new EventForwarder();
            final EventForwarder h4 = new EventForwarder();
            final EventForwarder h5 = new EventForwarder();
            final EventRecorder h6 = new EventRecorder(events, inbound);

            final Channel ch = new LocalChannel();
            if (!inbound) {
                ch.config().setAutoRead(false);
            }
            ch.pipeline().addLast(e1, h1)
                    .addLast(e1, h2)
                    .addLast(e1, h3)
                    .addLast(e1, h4)
                    .addLast(e1, h5)
                    .addLast(e1, "recorder", h6);

            l.register(ch).sync().channel().connect(localAddr).sync();

            final LinkedList<EventType> expectedEvents = events(inbound, 8192);

            Throwable cause = new Throwable();

            Thread pipelineModifier = new Thread(new Runnable() {
                @Override
                public void run() {
                    Random random = new Random();

                    while (true) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            return;
                        }
                        if (!ch.isRegistered()) {
                            continue;
                        }
                        //EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
                        ChannelHandler handler = ch.pipeline().removeFirst();
                        ch.pipeline().addBefore(groups[random.nextInt(groups.length)], "recorder",
                                UUID.randomUUID().toString(), handler);
                    }
                }
            });
            pipelineModifier.setDaemon(true);
            pipelineModifier.start();
            for (EventType event: expectedEvents) {
                switch (event) {
                    case EXCEPTION_CAUGHT:
                        ch.pipeline().fireExceptionCaught(cause);
                        break;
                    case MESSAGE_RECEIVED:
                        ch.pipeline().fireChannelRead("");
                        break;
                    case MESSAGE_RECEIVED_LAST:
                        ch.pipeline().fireChannelReadComplete();
                        break;
                    case USER_EVENT:
                        ch.pipeline().fireUserEventTriggered("");
                        break;
                    case WRITE:
                        ch.pipeline().write("");
                        break;
                    case READ:
                        ch.pipeline().read();
                        break;
                }
            }

            ch.close().sync();

            while (events.peekLast() != EventType.UNREGISTERED) {
                Thread.sleep(10);
            }

            expectedEvents.addFirst(EventType.ACTIVE);
            expectedEvents.addFirst(EventType.REGISTERED);
            expectedEvents.addLast(EventType.INACTIVE);
            expectedEvents.addLast(EventType.UNREGISTERED);

            for (;;) {
                EventType event = events.poll();
                if (event == null) {
                    Assert.assertTrue("Missing events:" + expectedEvents, expectedEvents.isEmpty());
                    break;
                }
                Assert.assertEquals(event, expectedEvents.poll());
            }
        } finally {
            l.shutdownGracefully();
            e1.shutdownGracefully();
            e2.shutdownGracefully();
            e3.shutdownGracefully();
            e4.shutdownGracefully();
            e5.shutdownGracefully();

            l.terminationFuture().sync();
            e1.terminationFuture().sync();
            e2.terminationFuture().sync();
            e3.terminationFuture().sync();
            e4.terminationFuture().sync();
            e5.terminationFuture().sync();
        }
    }

    private static LinkedList<EventType> events(boolean inbound, int size) {
        EventType[] events;
        if (inbound) {
            events = new EventType[] {
                    EventType.USER_EVENT, EventType.MESSAGE_RECEIVED, EventType.MESSAGE_RECEIVED_LAST,
                    EventType.EXCEPTION_CAUGHT};
        } else {
            events = new EventType[] {
                    EventType.READ, EventType.WRITE, EventType.EXCEPTION_CAUGHT };
        }

        Random random = new Random();
        LinkedList<EventType> expectedEvents = new LinkedList();
        for (int i = 0; i < size; i++) {
            expectedEvents.add(events[random.nextInt(events.length)]);
        }
        return expectedEvents;
    }

    @ChannelHandler.Sharable
    private static final class EventForwarder extends ChannelDuplexHandler { }

    private static final class EventRecorder extends ChannelDuplexHandler {
        private final Queue<EventType> events;
        private final boolean inbound;

        EventRecorder(Queue<EventType> events, boolean inbound) {
            this.events = events;
            this.inbound = inbound;
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            events.add(EventType.EXCEPTION_CAUGHT);
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (inbound) {
                events.add(EventType.USER_EVENT);
            }
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            if (inbound) {
                events.add(EventType.MESSAGE_RECEIVED_LAST);
            }
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            events.add(EventType.INACTIVE);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            events.add(EventType.ACTIVE);
        }

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            events.add(EventType.UNREGISTERED);
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            events.add(EventType.REGISTERED);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (inbound) {
                events.add(EventType.MESSAGE_RECEIVED);
            }
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (!inbound) {
                events.add(EventType.WRITE);
            }
            promise.setSuccess();
        }

        @Override
        public void read(ChannelHandlerContext ctx) {
            if (!inbound) {
                events.add(EventType.READ);
            }
        }
    }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java LocalTransportThreadModelTest3.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.