alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (DefaultHttp2RemoteFlowController.java)

This example Java source code file (DefaultHttp2RemoteFlowController.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

booleansupplier, caught, defaulthttp2remoteflowcontroller, flowcontrolled, flowstate, http2exception, http2streamvisitor, invalid, listener, listenerwritabilitymonitor, override, streambytedistributor, throwable, util, writabilitymonitor

The DefaultHttp2RemoteFlowController.java Java example source code

/*
 * Copyright 2014 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package io.netty.handler.codec.http2;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http2.StreamByteDistributor.Writer;
import io.netty.util.BooleanSupplier;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.ArrayDeque;
import java.util.Deque;

import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.streamError;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;

/**
 * Basic implementation of {@link Http2RemoteFlowController}.
 * <p>
 * This class is <strong>NOT thread safe. The assumption is all methods must be invoked from a single thread.
 * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
 */
@UnstableApi
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(DefaultHttp2RemoteFlowController.class);
    private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
    private final Http2Connection connection;
    private final Http2Connection.PropertyKey stateKey;
    private final StreamByteDistributor streamByteDistributor;
    private final FlowState connectionState;
    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
    private WritabilityMonitor monitor;
    private ChannelHandlerContext ctx;

    public DefaultHttp2RemoteFlowController(Http2Connection connection) {
        this(connection, (Listener) null);
    }

    public DefaultHttp2RemoteFlowController(Http2Connection connection,
                                            StreamByteDistributor streamByteDistributor) {
        this(connection, streamByteDistributor, null);
    }

    public DefaultHttp2RemoteFlowController(Http2Connection connection, final Listener listener) {
        this(connection, new WeightedFairQueueByteDistributor(connection), listener);
    }

    public DefaultHttp2RemoteFlowController(Http2Connection connection,
                                            StreamByteDistributor streamByteDistributor,
                                            final Listener listener) {
        this.connection = checkNotNull(connection, "connection");
        this.streamByteDistributor = checkNotNull(streamByteDistributor, "streamWriteDistributor");

        // Add a flow state for the connection.
        stateKey = connection.newKey();
        connectionState = new FlowState(connection.connectionStream());
        connection.connectionStream().setProperty(stateKey, connectionState);

        // Monitor may depend upon connectionState, and so initialize after connectionState
        listener(listener);
        monitor.windowSize(connectionState, initialWindowSize);

        // Register for notification of new streams.
        connection.addListener(new Http2ConnectionAdapter() {
            @Override
            public void onStreamAdded(Http2Stream stream) {
                // If the stream state is not open then the stream is not yet eligible for flow controlled frames and
                // only requires the ReducedFlowState. Otherwise the full amount of memory is required.
                stream.setProperty(stateKey, new FlowState(stream));
            }

            @Override
            public void onStreamActive(Http2Stream stream) {
                // If the object was previously created, but later activated then we have to ensure the proper
                // initialWindowSize is used.
                monitor.windowSize(state(stream), initialWindowSize);
            }

            @Override
            public void onStreamClosed(Http2Stream stream) {
                // Any pending frames can never be written, cancel and
                // write errors for any pending frames.
                state(stream).cancel();
            }

            @Override
            public void onStreamHalfClosed(Http2Stream stream) {
                if (HALF_CLOSED_LOCAL.equals(stream.state())) {
                    /**
                     * When this method is called there should not be any
                     * pending frames left if the API is used correctly. However,
                     * it is possible that a erroneous application can sneak
                     * in a frame even after having already written a frame with the
                     * END_STREAM flag set, as the stream state might not transition
                     * immediately to HALF_CLOSED_LOCAL / CLOSED due to flow control
                     * delaying the write.
                     *
                     * This is to cancel any such illegal writes.
                     */
                    state(stream).cancel();
                }
            }
        });
    }

    /**
     * {@inheritDoc}
     * <p>
     * Any queued {@link FlowControlled} objects will be sent.
     */
    @Override
    public void channelHandlerContext(ChannelHandlerContext ctx) throws Http2Exception {
        this.ctx = checkNotNull(ctx, "ctx");

        // Writing the pending bytes will not check writability change and instead a writability change notification
        // to be provided by an explicit call.
        channelWritabilityChanged();

        // Don't worry about cleaning up queued frames here if ctx is null. It is expected that all streams will be
        // closed and the queue cleanup will occur when the stream state transitions occur.

        // If any frames have been queued up, we should send them now that we have a channel context.
        if (isChannelWritable()) {
            writePendingBytes();
        }
    }

    @Override
    public ChannelHandlerContext channelHandlerContext() {
        return ctx;
    }

    @Override
    public void initialWindowSize(int newWindowSize) throws Http2Exception {
        assert ctx == null || ctx.executor().inEventLoop();
        monitor.initialWindowSize(newWindowSize);
    }

    @Override
    public int initialWindowSize() {
        return initialWindowSize;
    }

    @Override
    public int windowSize(Http2Stream stream) {
        return state(stream).windowSize();
    }

    @Override
    public boolean isWritable(Http2Stream stream) {
        return monitor.isWritable(state(stream));
    }

    @Override
    public void channelWritabilityChanged() throws Http2Exception {
        monitor.channelWritabilityChange();
    }

    private boolean isChannelWritable() {
        return ctx != null && isChannelWritable0();
    }

    private boolean isChannelWritable0() {
        return ctx.channel().isWritable();
    }

    @Override
    public void listener(Listener listener) {
        monitor = listener == null ? new WritabilityMonitor() : new ListenerWritabilityMonitor(listener);
    }

    @Override
    public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
        assert ctx == null || ctx.executor().inEventLoop();
        monitor.incrementWindowSize(state(stream), delta);
    }

    @Override
    public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
        // The context can be null assuming the frame will be queued and send later when the context is set.
        assert ctx == null || ctx.executor().inEventLoop();
        checkNotNull(frame, "frame");
        try {
            monitor.enqueueFrame(state(stream), frame);
        } catch (Throwable t) {
            frame.error(ctx, t);
        }
    }

    @Override
    public boolean hasFlowControlled(Http2Stream stream) {
        return state(stream).hasFrame();
    }

    private FlowState state(Http2Stream stream) {
        return (FlowState) stream.getProperty(stateKey);
    }

    /**
     * Returns the flow control window for the entire connection.
     */
    private int connectionWindowSize() {
        return connectionState.windowSize();
    }

    private int minUsableChannelBytes() {
        // The current allocation algorithm values "fairness" and doesn't give any consideration to "goodput". It
        // is possible that 1 byte will be allocated to many streams. In an effort to try to make "goodput"
        // reasonable with the current allocation algorithm we have this "cheap" check up front to ensure there is
        // an "adequate" amount of connection window before allocation is attempted. This is not foolproof as if the
        // number of streams is >= this minimal number then we may still have the issue, but the idea is to narrow the
        // circumstances in which this can happen without rewriting the allocation algorithm.
        return max(ctx.channel().config().getWriteBufferLowWaterMark(), MIN_WRITABLE_CHUNK);
    }

    private int maxUsableChannelBytes() {
        // If the channel isWritable, allow at least minUseableChannelBytes.
        int channelWritableBytes = (int) min(Integer.MAX_VALUE, ctx.channel().bytesBeforeUnwritable());
        int useableBytes = channelWritableBytes > 0 ? max(channelWritableBytes, minUsableChannelBytes()) : 0;

        // Clip the usable bytes by the connection window.
        return min(connectionState.windowSize(), useableBytes);
    }

    /**
     * The amount of bytes that can be supported by underlying {@link io.netty.channel.Channel} without
     * queuing "too-much".
     */
    private int writableBytes() {
        return min(connectionWindowSize(), maxUsableChannelBytes());
    }

    @Override
    public void writePendingBytes() throws Http2Exception {
        monitor.writePendingBytes();
    }

    /**
     * The remote flow control state for a single stream.
     */
    private final class FlowState implements StreamByteDistributor.StreamState {
        private final Http2Stream stream;
        private final Deque<FlowControlled> pendingWriteQueue;
        private int window;
        private int pendingBytes;
        private boolean markedWritable;

        /**
         * Set to true while a frame is being written, false otherwise.
         */
        private boolean writing;
        /**
         * Set to true if cancel() was called.
         */
        private boolean cancelled;
        private BooleanSupplier isWritableSupplier = new BooleanSupplier() {
            @Override
            public boolean get() throws Exception {
                return windowSize() - pendingBytes() > 0;
            }
        };

        FlowState(Http2Stream stream) {
            this.stream = stream;
            pendingWriteQueue = new ArrayDeque<FlowControlled>(2);
        }

        /**
         * Determine if the stream associated with this object is writable.
         * @return {@code true} if the stream associated with this object is writable.
         */
        boolean isWritable() {
            try {
                return isWritableSupplier.get();
            } catch (Throwable cause) {
                throw new Error("isWritableSupplier should never throw!", cause);
            }
        }

        /**
         * The stream this state is associated with.
         */
        @Override
        public Http2Stream stream() {
            return stream;
        }

        /**
         * Returns the parameter from the last call to {@link #markedWritability(boolean)}.
         */
        boolean markedWritability() {
            return markedWritable;
        }

        /**
         * Save the state of writability.
         */
        void markedWritability(boolean isWritable) {
            this.markedWritable = isWritable;
        }

        @Override
        public int windowSize() {
            return window;
        }

        /**
         * Reset the window size for this stream.
         */
        void windowSize(int initialWindowSize) {
            window = initialWindowSize;
        }

        /**
         * Write the allocated bytes for this stream.
         * @return the number of bytes written for a stream or {@code -1} if no write occurred.
         */
        int writeAllocatedBytes(int allocated) {
            final int initialAllocated = allocated;
            int writtenBytes;
            // In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
            Throwable cause = null;
            FlowControlled frame;
            try {
                assert !writing;
                writing = true;

                // Write the remainder of frames that we are allowed to
                boolean writeOccurred = false;
                while (!cancelled && (frame = peek()) != null) {
                    int maxBytes = min(allocated, writableWindow());
                    if (maxBytes <= 0 && frame.size() > 0) {
                        // The frame still has data, but the amount of allocated bytes has been exhausted.
                        // Don't write needless empty frames.
                        break;
                    }
                    writeOccurred = true;
                    int initialFrameSize = frame.size();
                    try {
                        frame.write(ctx, max(0, maxBytes));
                        if (frame.size() == 0) {
                            // This frame has been fully written, remove this frame and notify it.
                            // Since we remove this frame first, we're guaranteed that its error
                            // method will not be called when we call cancel.
                            pendingWriteQueue.remove();
                            frame.writeComplete();
                        }
                    } finally {
                        // Decrement allocated by how much was actually written.
                        allocated -= initialFrameSize - frame.size();
                    }
                }

                if (!writeOccurred) {
                    // Either there was no frame, or the amount of allocated bytes has been exhausted.
                    return -1;
                }

            } catch (Throwable t) {
                // Mark the state as cancelled, we'll clear the pending queue via cancel() below.
                cancelled = true;
                cause = t;
            } finally {
                writing = false;
                // Make sure we always decrement the flow control windows
                // by the bytes written.
                writtenBytes = initialAllocated - allocated;

                decrementPendingBytes(writtenBytes, false);
                decrementFlowControlWindow(writtenBytes);

                // If a cancellation occurred while writing, call cancel again to
                // clear and error all of the pending writes.
                if (cancelled) {
                    cancel(cause);
                }
            }
            return writtenBytes;
        }

        /**
         * Increments the flow control window for this stream by the given delta and returns the new value.
         */
        int incrementStreamWindow(int delta) throws Http2Exception {
            if (delta > 0 && Integer.MAX_VALUE - delta < window) {
                throw streamError(stream.id(), FLOW_CONTROL_ERROR,
                        "Window size overflow for stream: %d", stream.id());
            }
            window += delta;

            streamByteDistributor.updateStreamableBytes(this);
            return window;
        }

        /**
         * Returns the maximum writable window (minimum of the stream and connection windows).
         */
        private int writableWindow() {
            return min(window, connectionWindowSize());
        }

        @Override
        public int pendingBytes() {
            return pendingBytes;
        }

        /**
         * Adds the {@code frame} to the pending queue and increments the pending byte count.
         */
        void enqueueFrame(FlowControlled frame) {
            FlowControlled last = pendingWriteQueue.peekLast();
            if (last == null) {
                enqueueFrameWithoutMerge(frame);
                return;
            }

            int lastSize = last.size();
            if (last.merge(ctx, frame)) {
                incrementPendingBytes(last.size() - lastSize, true);
                return;
            }
            enqueueFrameWithoutMerge(frame);
        }

        private void enqueueFrameWithoutMerge(FlowControlled frame) {
            pendingWriteQueue.offer(frame);
            // This must be called after adding to the queue in order so that hasFrame() is
            // updated before updating the stream state.
            incrementPendingBytes(frame.size(), true);
        }

        @Override
        public boolean hasFrame() {
            return !pendingWriteQueue.isEmpty();
        }

        /**
         * Returns the the head of the pending queue, or {@code null} if empty.
         */
        private FlowControlled peek() {
            return pendingWriteQueue.peek();
        }

        /**
         * Any operations that may be pending are cleared and the status of these operations is failed.
         */
        void cancel() {
            cancel(null);
        }

        /**
         * Clears the pending queue and writes errors for each remaining frame.
         * @param cause the {@link Throwable} that caused this method to be invoked.
         */
        private void cancel(Throwable cause) {
            cancelled = true;
            // Ensure that the queue can't be modified while we are writing.
            if (writing) {
                return;
            }

            for (;;) {
                FlowControlled frame = pendingWriteQueue.poll();
                if (frame == null) {
                    break;
                }
                writeError(frame, streamError(stream.id(), INTERNAL_ERROR, cause,
                                              "Stream closed before write could take place"));
            }

            streamByteDistributor.updateStreamableBytes(this);

            isWritableSupplier = BooleanSupplier.FALSE_SUPPLIER;
            monitor.stateCancelled(this);
        }

        /**
         * Increments the number of pending bytes for this node and optionally updates the
         * {@link StreamByteDistributor}.
         */
        private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) {
            pendingBytes += numBytes;
            monitor.incrementPendingBytes(numBytes);
            if (updateStreamableBytes) {
                streamByteDistributor.updateStreamableBytes(this);
            }
        }

        /**
         * If this frame is in the pending queue, decrements the number of pending bytes for the stream.
         */
        private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) {
            incrementPendingBytes(-bytes, updateStreamableBytes);
        }

        /**
         * Decrement the per stream and connection flow control window by {@code bytes}.
         */
        private void decrementFlowControlWindow(int bytes) {
            try {
                int negativeBytes = -bytes;
                connectionState.incrementStreamWindow(negativeBytes);
                incrementStreamWindow(negativeBytes);
            } catch (Http2Exception e) {
                // Should never get here since we're decrementing.
                throw new IllegalStateException("Invalid window state when writing frame: " + e.getMessage(), e);
            }
        }

        /**
         * Discards this {@link FlowControlled}, writing an error. If this frame is in the pending queue,
         * the unwritten bytes are removed from this branch of the priority tree.
         */
        private void writeError(FlowControlled frame, Http2Exception cause) {
            assert ctx != null;
            decrementPendingBytes(frame.size(), true);
            frame.error(ctx, cause);
        }
    }

    /**
     * Abstract class which provides common functionality for writability monitor implementations.
     */
    private class WritabilityMonitor {
        private long totalPendingBytes;
        private final Writer writer = new StreamByteDistributor.Writer() {
            @Override
            public void write(Http2Stream stream, int numBytes) {
                state(stream).writeAllocatedBytes(numBytes);
            }
        };

        /**
         * Called when the writability of the underlying channel changes.
         * @throws Http2Exception If a write occurs and an exception happens in the write operation.
         */
        void channelWritabilityChange() throws Http2Exception { }

        /**
         * Called when the state is cancelled.
         * @param state the state that was cancelled.
         */
        void stateCancelled(FlowState state) { }

        /**
         * Set the initial window size for {@code state}.
         * @param state the state to change the initial window size for.
         * @param initialWindowSize the size of the window in bytes.
         */
        void windowSize(FlowState state, int initialWindowSize) {
            state.windowSize(initialWindowSize);
        }

        /**
         * Increment the window size for a particular stream.
         * @param state the state associated with the stream whose window is being incremented.
         * @param delta The amount to increment by.
         * @throws Http2Exception If this operation overflows the window for {@code state}.
         */
        void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
            state.incrementStreamWindow(delta);
        }

        /**
         * Add a frame to be sent via flow control.
         * @param state The state associated with the stream which the {@code frame} is associated with.
         * @param frame the frame to enqueue.
         * @throws Http2Exception If a writability error occurs.
         */
        void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
            state.enqueueFrame(frame);
        }

        /**
         * Increment the total amount of pending bytes for all streams. When any stream's pending bytes changes
         * method should be called.
         * @param delta The amount to increment by.
         */
        final void incrementPendingBytes(int delta) {
            totalPendingBytes += delta;

            // Notification of writibilty change should be delayed until the end of the top level event.
            // This is to ensure the flow controller is more consistent state before calling external listener methods.
        }

        /**
         * Determine if the stream associated with {@code state} is writable.
         * @param state The state which is associated with the stream to test writability for.
         * @return {@code true} if {@link FlowState#stream()} is writable. {@code false} otherwise.
         */
        final boolean isWritable(FlowState state) {
            return isWritableConnection() && state.isWritable();
        }

        final void writePendingBytes() throws Http2Exception {
            int bytesToWrite = writableBytes();

            // Make sure we always write at least once, regardless if we have bytesToWrite or not.
            // This ensures that zero-length frames will always be written.
            for (;;) {
                if (!streamByteDistributor.distribute(bytesToWrite, writer) ||
                    (bytesToWrite = writableBytes()) <= 0 ||
                    !isChannelWritable0()) {
                    break;
                }
            }
        }

        void initialWindowSize(int newWindowSize) throws Http2Exception {
            if (newWindowSize < 0) {
                throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
            }

            final int delta = newWindowSize - initialWindowSize;
            initialWindowSize = newWindowSize;
            connection.forEachActiveStream(new Http2StreamVisitor() {
                @Override
                public boolean visit(Http2Stream stream) throws Http2Exception {
                    state(stream).incrementStreamWindow(delta);
                    return true;
                }
            });

            if (delta > 0 && isChannelWritable()) {
                // The window size increased, send any pending frames for all streams.
                writePendingBytes();
            }
        }

        final boolean isWritableConnection() {
            return connectionState.windowSize() - totalPendingBytes > 0 && isChannelWritable();
        }
    }

    /**
     * Writability of a {@code stream} is calculated using the following:
     * <pre>
     * Connection Window - Total Queued Bytes > 0 &&
     * Stream Window - Bytes Queued for Stream > 0 &&
     * isChannelWritable()
     * </pre>
     */
    private final class ListenerWritabilityMonitor extends WritabilityMonitor {
        private final Listener listener;
        private final Http2StreamVisitor checkStreamWritabilityVisitor = new Http2StreamVisitor() {
            @Override
            public boolean visit(Http2Stream stream) throws Http2Exception {
                FlowState state = state(stream);
                if (isWritable(state) != state.markedWritability()) {
                    notifyWritabilityChanged(state);
                }
                return true;
            }
        };

        ListenerWritabilityMonitor(Listener listener) {
            this.listener = listener;
        }

        @Override
        void windowSize(FlowState state, int initialWindowSize) {
            super.windowSize(state, initialWindowSize);
            try {
                checkStateWritability(state);
            } catch (Http2Exception e) {
                throw new RuntimeException("Caught unexpected exception from window", e);
            }
        }

        @Override
        void incrementWindowSize(FlowState state, int delta) throws Http2Exception {
            super.incrementWindowSize(state, delta);
            checkStateWritability(state);
        }

        @Override
        void initialWindowSize(int newWindowSize) throws Http2Exception {
            super.initialWindowSize(newWindowSize);
            if (isWritableConnection()) {
                // If the write operation does not occur we still need to check all streams because they
                // may have transitioned from writable to not writable.
                checkAllWritabilityChanged();
            }
        }

        @Override
        void enqueueFrame(FlowState state, FlowControlled frame) throws Http2Exception {
            super.enqueueFrame(state, frame);
            checkConnectionThenStreamWritabilityChanged(state);
        }

        @Override
        void stateCancelled(FlowState state) {
            try {
                checkConnectionThenStreamWritabilityChanged(state);
            } catch (Http2Exception e) {
                throw new RuntimeException("Caught unexpected exception from checkAllWritabilityChanged", e);
            }
        }

        @Override
        void channelWritabilityChange() throws Http2Exception {
            if (connectionState.markedWritability() != isChannelWritable()) {
                checkAllWritabilityChanged();
            }
        }

        private void checkStateWritability(FlowState state) throws Http2Exception {
            if (isWritable(state) != state.markedWritability()) {
                if (state == connectionState) {
                    checkAllWritabilityChanged();
                } else {
                    notifyWritabilityChanged(state);
                }
            }
        }

        private void notifyWritabilityChanged(FlowState state) {
            state.markedWritability(!state.markedWritability());
            try {
                listener.writabilityChanged(state.stream);
            } catch (Throwable cause) {
                logger.error("Caught Throwable from listener.writabilityChanged", cause);
            }
        }

        private void checkConnectionThenStreamWritabilityChanged(FlowState state) throws Http2Exception {
            // It is possible that the connection window and/or the individual stream writability could change.
            if (isWritableConnection() != connectionState.markedWritability()) {
                checkAllWritabilityChanged();
            } else if (isWritable(state) != state.markedWritability()) {
                notifyWritabilityChanged(state);
            }
        }

        private void checkAllWritabilityChanged() throws Http2Exception {
            // Make sure we mark that we have notified as a result of this change.
            connectionState.markedWritability(isWritableConnection());
            connection.forEachActiveStream(checkStreamWritabilityVisitor);
        }
    }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java DefaultHttp2RemoteFlowController.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.