alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (UniformStreamByteDistributor.java)

This example Java source code file (UniformStreamByteDistributor.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

arraydeque, default_min_allocation_chunk, deque, http2connectionadapter, http2exception, http2stream, illegalargumentexception, override, state, throwable, uniformstreambytedistributor, unstableapi, util, writer

The UniformStreamByteDistributor.java Java example source code

/*
 * Copyright 2015 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package io.netty.handler.codec.http2;

import io.netty.util.internal.UnstableApi;

import java.util.ArrayDeque;
import java.util.Deque;

import static io.netty.handler.codec.http2.Http2CodecUtil.streamableBytes;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;

/**
 * A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
 * streams. This class uses a minimum chunk size that will be allocated to each stream. While
 * fewer streams may be written to in each call to {@link #distribute(int, Writer)}, doing this
 * should improve the goodput on each written stream.
 */
@UnstableApi
public final class UniformStreamByteDistributor implements StreamByteDistributor {
    static final int DEFAULT_MIN_ALLOCATION_CHUNK = 1024;

    private final Http2Connection.PropertyKey stateKey;
    private final Deque<State> queue = new ArrayDeque(4);

    /**
     * The minimum number of bytes that we will attempt to allocate to a stream. This is to
     * help improve goodput on a per-stream basis.
     */
    private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
    private long totalStreamableBytes;

    public UniformStreamByteDistributor(Http2Connection connection) {
        // Add a state for the connection.
        stateKey = connection.newKey();
        Http2Stream connectionStream = connection.connectionStream();
        connectionStream.setProperty(stateKey, new State(connectionStream));

        // Register for notification of new streams.
        connection.addListener(new Http2ConnectionAdapter() {
            @Override
            public void onStreamAdded(Http2Stream stream) {
                stream.setProperty(stateKey, new State(stream));
            }

            @Override
            public void onStreamClosed(Http2Stream stream) {
                state(stream).close();
            }
        });
    }

    /**
     * Sets the minimum allocation chunk that will be allocated to each stream. Defaults to 1KiB.
     *
     * @param minAllocationChunk the minimum number of bytes that will be allocated to each stream.
     * Must be > 0.
     */
    public void minAllocationChunk(int minAllocationChunk) {
        if (minAllocationChunk <= 0) {
            throw new IllegalArgumentException("minAllocationChunk must be > 0");
        }
        this.minAllocationChunk = minAllocationChunk;
    }

    @Override
    public void updateStreamableBytes(StreamState streamState) {
        state(streamState.stream()).updateStreamableBytes(streamableBytes(streamState),
                                                          streamState.hasFrame(),
                                                          streamState.windowSize());
    }

    @Override
    public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
        checkNotNull(writer, "writer");

        final int size = queue.size();
        if (size == 0) {
            return totalStreamableBytes > 0;
        }

        final int chunkSize = max(minAllocationChunk, maxBytes / size);

        State state = queue.pollFirst();
        do {
            state.enqueued = false;
            if (state.windowNegative) {
                continue;
            }
            if (maxBytes == 0 && state.streamableBytes > 0) {
                // Stop at the first state that can't send. Add this state back to the head of the queue. Note
                // that empty frames at the head of the queue will always be written, assuming the stream window
                // is not negative.
                queue.addFirst(state);
                state.enqueued = true;
                break;
            }

            // Allocate as much data as we can for this stream.
            int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
            maxBytes -= chunk;

            // Write the allocated bytes and enqueue as necessary.
            state.write(chunk, writer);
        } while ((state = queue.pollFirst()) != null);

        return totalStreamableBytes > 0;
    }

    private State state(Http2Stream stream) {
        return checkNotNull(stream, "stream").getProperty(stateKey);
    }

    /**
     * For testing only!
     */
    int streamableBytes0(Http2Stream stream) {
        return state(stream).streamableBytes;
    }

    /**
     * The remote flow control state for a single stream.
     */
    private final class State {
        final Http2Stream stream;
        int streamableBytes;
        boolean windowNegative;
        boolean enqueued;
        boolean writing;

        State(Http2Stream stream) {
            this.stream = stream;
        }

        void updateStreamableBytes(int newStreamableBytes, boolean hasFrame, int windowSize) {
            assert hasFrame || newStreamableBytes == 0 :
                "hasFrame: " + hasFrame + " newStreamableBytes: " + newStreamableBytes;

            int delta = newStreamableBytes - streamableBytes;
            if (delta != 0) {
                streamableBytes = newStreamableBytes;
                totalStreamableBytes += delta;
            }
            // In addition to only enqueuing state when they have frames we enforce the following restrictions:
            // 1. If the window has gone negative. We never want to queue a state. However we also don't want to
            //    Immediately remove the item if it is already queued because removal from dequeue is O(n). So
            //    we allow it to stay queued and rely on the distribution loop to remove this state.
            // 2. If the window is zero we only want to queue if we are not writing. If we are writing that means
            //    we gave the state a chance to write zero length frames. We wait until updateStreamableBytes is
            //    called again before this state is allowed to write.
            windowNegative = windowSize < 0;
            if (hasFrame && (windowSize > 0 || (windowSize == 0 && !writing))) {
                addToQueue();
            }
        }

        /**
         * Write any allocated bytes for the given stream and updates the streamable bytes,
         * assuming all of the bytes will be written.
         */
        void write(int numBytes, Writer writer) throws Http2Exception {
            writing = true;
            try {
                // Write the allocated bytes.
                writer.write(stream, numBytes);
            } catch (Throwable t) {
                throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
            } finally {
                writing = false;
            }
        }

        void addToQueue() {
            if (!enqueued) {
                enqueued = true;
                queue.addLast(this);
            }
        }

        void removeFromQueue() {
            if (enqueued) {
                enqueued = false;
                queue.remove(this);
            }
        }

        void close() {
            // Remove this state from the queue.
            removeFromQueue();

            // Clear the streamable bytes.
            updateStreamableBytes(0, false, 0);
        }
    }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java UniformStreamByteDistributor.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.