alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (WeightedFairQueueByteDistributor.java)

This example Java source code file (WeightedFairQueueByteDistributor.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

connection_stream_id, http2connectionadapter, http2exception, http2stream, illegalargumentexception, index_not_in_queue, override, priorityqueuenode, state, streambytedistributor, throwable, unstableapi, util, weightedfairqueuebytedistributor, writer

The WeightedFairQueueByteDistributor.java Java example source code

/*
 * Copyright 2015 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package io.netty.handler.codec.http2;

import io.netty.util.internal.MathUtil;
import io.netty.util.internal.PriorityQueue;
import io.netty.util.internal.PriorityQueueNode;
import io.netty.util.internal.UnstableApi;

import java.util.Queue;

import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;

/**
 * A {@link StreamByteDistributor} that is sensitive to stream priority and uses
 * <a href="https://en.wikipedia.org/wiki/Weighted_fair_queueing">Weighted Fair Queueing approach for distributing
 * bytes.
 * <p>
 * Inspiration for this distributor was taken from Linux's
 * <a href="https://git.kernel.org/cgit/linux/kernel/git/stable/linux-stable.git/tree/Documentation/scheduler
 * /sched-design-CFS.txt">Completely Fair Scheduler</a>
 * to model the distribution of bytes to simulate an "ideal multi-tasking CPU", but in this case we are simulating
 * an "ideal multi-tasking NIC".
 * <p>
 * Each write operation will use the {@link #allocationQuantum(int)} to know how many more bytes should be allocated
 * relative to the next stream which wants to write. This is to balance fairness while also considering goodput.
 */
@UnstableApi
public final class WeightedFairQueueByteDistributor implements StreamByteDistributor {
    private final Http2Connection.PropertyKey stateKey;
    private final State connectionState;
    /**
     * The minimum number of bytes that we will attempt to allocate to a stream. This is to
     * help improve goodput on a per-stream basis.
     */
    private int allocationQuantum = 1024;

    public WeightedFairQueueByteDistributor(Http2Connection connection) {
        stateKey = connection.newKey();
        Http2Stream connectionStream = connection.connectionStream();
        connectionStream.setProperty(stateKey, connectionState = new State(connectionStream, 16));

        // Register for notification of new streams.
        connection.addListener(new Http2ConnectionAdapter() {
            @Override
            public void onStreamAdded(Http2Stream stream) {
                stream.setProperty(stateKey, new State(stream));
            }

            @Override
            public void onWeightChanged(Http2Stream stream, short oldWeight) {
                Http2Stream parent;
                if (state(stream).activeCountForTree != 0 && (parent = stream.parent()) != null) {
                    state(parent).totalQueuedWeights += stream.weight() - oldWeight;
                }
            }

            @Override
            public void onStreamClosed(Http2Stream stream) {
                state(stream).close();
            }

            @Override
            public void onPriorityTreeParentChanged(Http2Stream stream, Http2Stream oldParent) {
                Http2Stream parent = stream.parent();
                if (parent != null) {
                    State state = state(stream);
                    if (state.activeCountForTree != 0) {
                        State pState = state(parent);
                        pState.offerAndInitializePseudoTime(state);
                        pState.isActiveCountChangeForTree(state.activeCountForTree);
                    }
                }
            }

            @Override
            public void onPriorityTreeParentChanging(Http2Stream stream, Http2Stream newParent) {
                Http2Stream parent = stream.parent();
                if (parent != null) {
                    State state = state(stream);
                    if (state.activeCountForTree != 0) {
                        State pState = state(parent);
                        pState.remove(state);
                        pState.isActiveCountChangeForTree(-state.activeCountForTree);
                    }
                }
            }
        });
    }

    @Override
    public void updateStreamableBytes(StreamState state) {
        state(state.stream()).updateStreamableBytes(streamableBytes(state),
                                                    state.hasFrame() && state.windowSize() >= 0);
    }

    @Override
    public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
        checkNotNull(writer, "writer");

        // As long as there is some active frame we should write at least 1 time.
        if (connectionState.activeCountForTree == 0) {
            return false;
        }

        // The goal is to write until we write all the allocated bytes or are no longer making progress.
        // We still attempt to write even after the number of allocated bytes has been exhausted to allow empty frames
        // to be sent. Making progress means the active streams rooted at the connection stream has changed.
        int oldIsActiveCountForTree;
        do {
            oldIsActiveCountForTree = connectionState.activeCountForTree;
            // connectionState will never be active, so go right to its children.
            maxBytes -= distributeToChildren(maxBytes, writer, connectionState);
        } while (connectionState.activeCountForTree != 0 &&
                (maxBytes > 0 || oldIsActiveCountForTree != connectionState.activeCountForTree));

        return connectionState.activeCountForTree != 0;
    }

    /**
     * Sets the amount of bytes that will be allocated to each stream. Defaults to 1KiB.
     * @param allocationQuantum the amount of bytes that will be allocated to each stream. Must be > 0.
     */
    public void allocationQuantum(int allocationQuantum) {
        if (allocationQuantum <= 0) {
            throw new IllegalArgumentException("allocationQuantum must be > 0");
        }
        this.allocationQuantum = allocationQuantum;
    }

    private int distribute(int maxBytes, Writer writer, State state) throws Http2Exception {
        if (state.active) {
            int nsent = min(maxBytes, state.streamableBytes);
            state.write(nsent, writer);
            if (nsent == 0 && maxBytes != 0) {
                // If a stream sends zero bytes, then we gave it a chance to write empty frames and it is now
                // considered inactive until the next call to updateStreamableBytes. This allows descendant streams to
                // be allocated bytes when the parent stream can't utilize them. This may be as a result of the
                // stream's flow control window being 0.
                state.updateStreamableBytes(state.streamableBytes, false);
            }
            return nsent;
        }

        return distributeToChildren(maxBytes, writer, state);
    }

    /**
     * It is a pre-condition that {@code state.poll()} returns a non-{@code null} value. This is a result of the way
     * the allocation algorithm is structured and can be explained in the following cases:
     * <h3>For the recursive case
     * If a stream has no children (in the allocation tree) than that node must be active or it will not be in the
     * allocation tree. If a node is active then it will not delegate to children and recursion ends.
     * <h3>For the initial case
     * We check connectionState.activeCountForTree == 0 before any allocation is done. So if the connection stream
     * has no active children we don't get into this method.
     */
    private int distributeToChildren(int maxBytes, Writer writer, State state) throws Http2Exception {
        long oldTotalQueuedWeights = state.totalQueuedWeights;
        State childState = state.poll();
        State nextChildState = state.peek();
        try {
            assert nextChildState == null || nextChildState.pseudoTimeToWrite >= childState.pseudoTimeToWrite :
                "nextChildState.pseudoTime(" + nextChildState.pseudoTimeToWrite + ") < " + " childState.pseudoTime(" +
                    childState.pseudoTimeToWrite + ")";
            int nsent = distribute(nextChildState == null ? maxBytes :
                            min(maxBytes, (int) min((nextChildState.pseudoTimeToWrite - childState.pseudoTimeToWrite) *
                                                childState.stream.weight() / oldTotalQueuedWeights + allocationQuantum,
                                                Integer.MAX_VALUE)
                               ),
                               writer,
                               childState);
            state.pseudoTime += nsent;
            childState.updatePseudoTime(state, nsent, oldTotalQueuedWeights);
            return nsent;
        } finally {
            // Do in finally to ensure the internal state is not corrupted if an exception is thrown.
            // The offer operation is delayed until we unroll up the recursive stack, so we don't have to remove from
            // the priority queue due to a write operation.
            if (childState.activeCountForTree != 0) {
                state.offer(childState);
            }
        }
    }

    private State state(Http2Stream stream) {
        return stream.getProperty(stateKey);
    }

    /**
     * For testing only!
     */
    int streamableBytes0(Http2Stream stream) {
        return state(stream).streamableBytes;
    }

    /**
     * The remote flow control state for a single stream.
     */
    private final class State implements PriorityQueueNode<State> {
        final Http2Stream stream;
        private final Queue<State> queue;
        int streamableBytes;
        /**
         * Count of nodes rooted at this sub tree with {@link #active} equal to {@code true}.
         */
        int activeCountForTree;
        private int priorityQueueIndex = INDEX_NOT_IN_QUEUE;
        /**
         * An estimate of when this node should be given the opportunity to write data.
         */
        long pseudoTimeToWrite;
        /**
         * A pseudo time maintained for immediate children to base their {@link pseudoTimeToSend} off of.
         */
        long pseudoTime;
        long totalQueuedWeights;
        boolean active;

        State(Http2Stream stream) {
            this(stream, 0);
        }

        State(Http2Stream stream, int initialSize) {
            this.stream = stream;
            queue = new PriorityQueue<State>(initialSize);
        }

        void write(int numBytes, Writer writer) throws Http2Exception {
            try {
                writer.write(stream, numBytes);
            } catch (Throwable t) {
                throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
            }
        }

        void isActiveCountChangeForTree(int increment) {
            assert activeCountForTree + increment >= 0;
            activeCountForTree += increment;
            if (!stream.isRoot()) {
                State pState = state(stream.parent());
                if (activeCountForTree == 0) {
                    pState.remove(this);
                } else if (activeCountForTree - increment == 0) { // if frame count was 0 but is now not, then queue.
                    pState.offerAndInitializePseudoTime(this);
                }
                pState.isActiveCountChangeForTree(increment);
            }
        }

        void updateStreamableBytes(int newStreamableBytes, boolean isActive) {
            if (this.active != isActive) {
                isActiveCountChangeForTree(isActive ? 1 : -1);
                this.active = isActive;
            }

            streamableBytes = newStreamableBytes;
        }

        /**
         * Assumes the parents {@link #totalQueuedWeights} includes this node's weight.
         */
        void updatePseudoTime(State parentState, int nsent, long totalQueuedWeights) {
            assert stream.id() != CONNECTION_STREAM_ID && nsent >= 0;
            // If the current pseudoTimeToSend is greater than parentState.pseudoTime then we previously over accounted
            // and should use parentState.pseudoTime.
            pseudoTimeToWrite = min(pseudoTimeToWrite, parentState.pseudoTime) +
                                nsent * totalQueuedWeights / stream.weight();
        }

        /**
         * The concept of pseudoTime can be influenced by priority tree manipulations or if a stream goes from "active"
         * to "non-active". This method accounts for that by initializing the {@link #pseudoTimeToWrite} for
         * {@code state} to {@link #pseudoTime} of this node and then calls {@link #offer(State)}.
         */
        void offerAndInitializePseudoTime(State state) {
            state.pseudoTimeToWrite = pseudoTime;
            offer(state);
        }

        void offer(State state) {
            queue.offer(state);
            totalQueuedWeights += state.stream.weight();
        }

        /**
         * Must only be called if the queue is non-empty!
         */
        State poll() {
            State state = queue.poll();
            // This method is only ever called if the queue is non-empty.
            totalQueuedWeights -= state.stream.weight();
            return state;
        }

        void remove(State state) {
            if (queue.remove(state)) {
                totalQueuedWeights -= state.stream.weight();
            }
        }

        State peek() {
            return queue.peek();
        }

        void close() {
            updateStreamableBytes(0, false);
        }

        @Override
        public int compareTo(State o) {
            return MathUtil.compare(pseudoTimeToWrite, o.pseudoTimeToWrite);
        }

        @Override
        public int priorityQueueIndex() {
            return priorityQueueIndex;
        }

        @Override
        public void priorityQueueIndex(int i) {
            priorityQueueIndex = i;
        }
    }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java WeightedFairQueueByteDistributor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.