|
Java example source code file (AbstractChannelHandlerContext.java)
The AbstractChannelHandlerContext.java Java example source code/* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package io.netty.channel; import io.netty.buffer.ByteBufAllocator; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.ThrowableUtil; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.StringUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class); volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; /** * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called. */ private static final int ADDED = 1; /** * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int REMOVED = 2; /** * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called. */ private static final int INIT = 0; private final boolean inbound; private final boolean outbound; private final DefaultChannelPipeline pipeline; private final String name; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor; private ChannelFuture succeededFuture; private int handlerState = INIT; // Lazily instantiated tasks used to trigger events to a handler with different executor. // There is no need to make this volatile as at worse it will just create a few more instances then needed. private Runnable invokeChannelReadCompleteTask; private Runnable invokeReadTask; private Runnable invokeChannelWritableStateChangedTask; private Runnable invokeFlushTask; AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { if (name == null) { throw new NullPointerException("name"); } this.pipeline = pipeline; this.name = name; this.executor = executor; this.inbound = inbound; this.outbound = outbound; } @Override public Channel channel() { return pipeline.channel(); } @Override public ChannelPipeline pipeline() { return pipeline; } @Override public ByteBufAllocator alloc() { return channel().config().getAllocator(); } @Override public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } } @Override public String name() { return name; } @Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; } static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } private void invokeChannelRegistered() { if (isAdded()) { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } } @Override public ChannelHandlerContext fireChannelUnregistered() { invokeChannelUnregistered(findContextInbound()); return this; } static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelUnregistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelUnregistered(); } }); } } private void invokeChannelUnregistered() { if (isAdded()) { try { ((ChannelInboundHandler) handler()).channelUnregistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelUnregistered(); } } @Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); invokeChannelActive(next); return this; } static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } } private void invokeChannelActive() { if (isAdded()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } } @Override public ChannelHandlerContext fireChannelInactive() { invokeChannelInactive(findContextInbound()); return this; } static void invokeChannelInactive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelInactive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelInactive(); } }); } } private void invokeChannelInactive() { if (isAdded()) { try { ((ChannelInboundHandler) handler()).channelInactive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelInactive(); } } @Override public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { invokeExceptionCaught(next, cause); return this; } static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } else { try { executor.execute(new Runnable() { @Override public void run() { next.invokeExceptionCaught(cause); } }); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); logger.warn("The exceptionCaught() event that was failed to submit was:", cause); } } } } private void invokeExceptionCaught(final Throwable cause) { if (isAdded()) { try { handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug( "An exception {}" + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", ThrowableUtil.stackTraceToString(error), cause); } else if (logger.isWarnEnabled()) { logger.warn( "An exception '{}' [enable DEBUG level for full stacktrace] " + "was thrown by a user handler's exceptionCaught() " + "method while handling the following exception:", error, cause); } } } else { fireExceptionCaught(cause); } } @Override public ChannelHandlerContext fireUserEventTriggered(final Object event) { invokeUserEventTriggered(findContextInbound(), event); return this; } static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) { ObjectUtil.checkNotNull(event, "event"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeUserEventTriggered(event); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeUserEventTriggered(event); } }); } } private void invokeUserEventTriggered(Object event) { if (isAdded()) { try { ((ChannelInboundHandler) handler()).userEventTriggered(this, event); } catch (Throwable t) { notifyHandlerException(t); } } else { fireUserEventTriggered(event); } } @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (isAdded()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } @Override public ChannelHandlerContext fireChannelReadComplete() { invokeChannelReadComplete(findContextInbound()); return this; } static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelReadComplete(); } else { Runnable task = next.invokeChannelReadCompleteTask; if (task == null) { next.invokeChannelReadCompleteTask = task = new Runnable() { @Override public void run() { next.invokeChannelReadComplete(); } }; } executor.execute(task); } } private void invokeChannelReadComplete() { if (isAdded()) { try { ((ChannelInboundHandler) handler()).channelReadComplete(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelReadComplete(); } } @Override public ChannelHandlerContext fireChannelWritabilityChanged() { invokeChannelWritabilityChanged(findContextInbound()); return this; } static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelWritabilityChanged(); } else { Runnable task = next.invokeChannelWritableStateChangedTask; if (task == null) { next.invokeChannelWritableStateChangedTask = task = new Runnable() { @Override public void run() { next.invokeChannelWritabilityChanged(); } }; } executor.execute(task); } } private void invokeChannelWritabilityChanged() { if (isAdded()) { try { ((ChannelInboundHandler) handler()).channelWritabilityChanged(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelWritabilityChanged(); } } @Override public ChannelFuture bind(SocketAddress localAddress) { return bind(localAddress, newPromise()); } @Override public ChannelFuture connect(SocketAddress remoteAddress) { return connect(remoteAddress, newPromise()); } @Override public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { return connect(remoteAddress, localAddress, newPromise()); } @Override public ChannelFuture disconnect() { return disconnect(newPromise()); } @Override public ChannelFuture close() { return close(newPromise()); } @Override public ChannelFuture deregister() { return deregister(newPromise()); } @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (isAdded()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } } @Override public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return connect(remoteAddress, null, promise); } @Override public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; } private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { if (isAdded()) { try { ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { connect(remoteAddress, localAddress, promise); } } @Override public ChannelFuture disconnect(final ChannelPromise promise) { if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // Translate disconnect to close if the channel has no notion of disconnect-reconnect. // So far, UDP/IP is the only transport that has such behavior. if (!channel().metadata().hasDisconnect()) { next.invokeClose(promise); } else { next.invokeDisconnect(promise); } } else { safeExecute(executor, new Runnable() { @Override public void run() { if (!channel().metadata().hasDisconnect()) { next.invokeClose(promise); } else { next.invokeDisconnect(promise); } } }, promise, null); } return promise; } private void invokeDisconnect(ChannelPromise promise) { if (isAdded()) { try { ((ChannelOutboundHandler) handler()).disconnect(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { disconnect(promise); } } @Override public ChannelFuture close(final ChannelPromise promise) { if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeClose(promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeClose(promise); } }, promise, null); } return promise; } private void invokeClose(ChannelPromise promise) { if (isAdded()) { try { ((ChannelOutboundHandler) handler()).close(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { close(promise); } } @Override public ChannelFuture deregister(final ChannelPromise promise) { if (!validatePromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeDeregister(promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeDeregister(promise); } }, promise, null); } return promise; } private void invokeDeregister(ChannelPromise promise) { if (isAdded()) { try { ((ChannelOutboundHandler) handler()).deregister(this, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { deregister(promise); } } @Override public ChannelHandlerContext read() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null) { next.invokeReadTask = task = new Runnable() { @Override public void run() { next.invokeRead(); } }; } executor.execute(task); } return this; } private void invokeRead() { if (isAdded()) { try { ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); } } else { read(); } } @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } try { if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } write(msg, false, promise); return promise; } private void invokeWrite(Object msg, ChannelPromise promise) { if (isAdded()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } @Override public ChannelHandlerContext flush() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); } else { Runnable task = next.invokeFlushTask; if (task == null) { next.invokeFlushTask = task = new Runnable() { @Override public void run() { next.invokeFlush(); } }; } safeExecute(executor, task, channel().voidPromise(), null); } return this; } private void invokeFlush() { if (isAdded()) { invokeFlush0(); } else { flush(); } } private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { notifyHandlerException(t); } } @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } write(msg, true, promise); return promise; } private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (isAdded()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } @Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { if (!promise.tryFailure(cause) && !(promise instanceof VoidChannelPromise)) { if (logger.isWarnEnabled()) { logger.warn("Failed to fail the promise because it's done already: {}", promise, cause); } } } private void notifyHandlerException(Throwable cause) { if (inExceptionCaught(cause)) { if (logger.isWarnEnabled()) { logger.warn( "An exception was thrown by a user handler " + "while handling an exceptionCaught event", cause); } return; } invokeExceptionCaught(cause); } private static boolean inExceptionCaught(Throwable cause) { do { StackTraceElement[] trace = cause.getStackTrace(); if (trace != null) { for (StackTraceElement t : trace) { if (t == null) { break; } if ("exceptionCaught".equals(t.getMethodName())) { return true; } } } cause = cause.getCause(); } while (cause != null); return false; } @Override public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); } @Override public ChannelProgressivePromise newProgressivePromise() { return new DefaultChannelProgressivePromise(channel(), executor()); } @Override public ChannelFuture newSucceededFuture() { ChannelFuture succeededFuture = this.succeededFuture; if (succeededFuture == null) { this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor()); } return succeededFuture; } @Override public ChannelFuture newFailedFuture(Throwable cause) { return new FailedChannelFuture(channel(), executor(), cause); } private boolean validatePromise(ChannelPromise promise, boolean allowVoidPromise) { if (promise == null) { throw new NullPointerException("promise"); } if (promise.isDone()) { // Check if the promise was cancelled and if so signal that the processing of the operation // should not be performed. // // See https://github.com/netty/netty/issues/2349 if (promise.isCancelled()) { return false; } throw new IllegalArgumentException("promise already done: " + promise); } if (promise.channel() != channel()) { throw new IllegalArgumentException(String.format( "promise.channel does not match: %s (expected: %s)", promise.channel(), channel())); } if (promise.getClass() == DefaultChannelPromise.class) { return true; } if (!allowVoidPromise && promise instanceof VoidChannelPromise) { throw new IllegalArgumentException( StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation"); } if (promise instanceof AbstractChannel.CloseFuture) { throw new IllegalArgumentException( StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline"); } return true; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; } private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; } @Override public ChannelPromise voidPromise() { return channel().voidPromise(); } final void setRemoved() { handlerState = REMOVED; } final void setAdded() { handlerState = ADDED; } /** * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called * yet. If not return {@code false} and if called or could not detect return {@code true}. * * If this method returns {@code true} we will not invoke the {@link ChannelHandler} but just forward the event. * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list * but not called {@link } */ private boolean isAdded() { return handlerState == ADDED; } @Override public boolean isRemoved() { return handlerState == REMOVED; } @Override public <T> Attribute Other Java examples (source code examples)Here is a short list of links related to this Java AbstractChannelHandlerContext.java source code file: |
... this post is sponsored by my books ... | |
#1 New Release! |
FP Best Seller |
Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.
A percentage of advertising revenue from
pages under the /java/jwarehouse
URI on this website is
paid back to open source projects.