alvinalexander.com | career | drupal | java | mac | mysql | perl | scala | uml | unix  

Java example source code file (RedisDecoder.java)

This example Java source code file (RedisDecoder.java) is included in the alvinalexander.com "Java Source Code Warehouse" project. The intent of this project is to help you "Learn Java by Example" TM.

Learn more about this Java project at its project page.

Java - Java tags/keywords

array_header, bulk_string, bytebuf, decode_bulk_string_eol, decode_length, decode_type, exception, integer, list, rediscodecexception, redisdecoder, redismessagepool, state, topositivelongprocessor, util

The RedisDecoder.java Java example source code

/*
 * Copyright 2016 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License, version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at:
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package io.netty.handler.codec.redis;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.UnstableApi;

import java.util.List;

/**
 * Decodes the Redis protocol into {@link RedisMessage} objects following
 * <a href="http://redis.io/topics/protocol">RESP (REdis Serialization Protocol).
 *
 * {@link RedisMessage} parts can be aggregated to {@link RedisMessage} using
 * {@link RedisArrayAggregator} or processed directly.
 */
@UnstableApi
public final class RedisDecoder extends ByteToMessageDecoder {

    private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();

    private final int maxInlineMessageLength;
    private final RedisMessagePool messagePool;

    // current decoding states
    private State state = State.DECODE_TYPE;
    private RedisMessageType type;
    private int remainingBulkLength;

    private enum State {
        DECODE_TYPE,
        DECODE_INLINE, // SIMPLE_STRING, ERROR, INTEGER
        DECODE_LENGTH, // BULK_STRING, ARRAY_HEADER
        DECODE_BULK_STRING_EOL,
        DECODE_BULK_STRING_CONTENT,
    }

    /**
     * Creates a new instance with default {@code maxInlineMessageLength} and {@code messageaPool}.
     */
    public RedisDecoder() {
        // 1024 * 64 is max inline length of current Redis server implementation.
        this(1024 * 64, FixedRedisMessagePool.INSTANCE);
    }

    /**
     * Creates a new instance.
     * @param maxInlineMessageLength the maximum length of inline message.
     * @param messagePool the predefined message pool.
     */
    public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) {
        if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
            throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength +
                                          " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
        }
        this.maxInlineMessageLength = maxInlineMessageLength;
        this.messagePool = messagePool;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        try {
            for (;;) {
                switch (state) {
                case DECODE_TYPE:
                    if (!decodeType(in)) {
                        return;
                    }
                    break;
                case DECODE_INLINE:
                    if (!decodeInline(in, out)) {
                        return;
                    }
                    break;
                case DECODE_LENGTH:
                    if (!decodeLength(in, out)) {
                        return;
                    }
                    break;
                case DECODE_BULK_STRING_EOL:
                    if (!decodeBulkStringEndOfLine(in, out)) {
                        return;
                    }
                    break;
                case DECODE_BULK_STRING_CONTENT:
                    if (!decodeBulkStringContent(in, out)) {
                        return;
                    }
                    break;
                default:
                    throw new RedisCodecException("Unknown state: " + state);
                }
            }
        } catch (RedisCodecException e) {
            resetDecoder();
            throw e;
        } catch (Exception e) {
            resetDecoder();
            throw new RedisCodecException(e);
        }
    }

    private void resetDecoder() {
        state = State.DECODE_TYPE;
        remainingBulkLength = 0;
    }

    private boolean decodeType(ByteBuf in) throws Exception {
        if (!in.isReadable()) {
            return false;
        }
        type = RedisMessageType.valueOf(in.readByte());
        state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH;
        return true;
    }

    private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception {
        ByteBuf lineBytes = readLine(in);
        if (lineBytes == null) {
            if (in.readableBytes() > maxInlineMessageLength) {
                throw new RedisCodecException("length: " + in.readableBytes() +
                                              " (expected: <= " + maxInlineMessageLength + ")");
            }
            return false;
        }
        out.add(newInlineRedisMessage(type, lineBytes));
        resetDecoder();
        return true;
    }

    private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception {
        ByteBuf lineByteBuf = readLine(in);
        if (lineByteBuf == null) {
            return false;
        }
        final long length = parseRedisNumber(lineByteBuf);
        if (length < RedisConstants.NULL_VALUE) {
            throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")");
        }
        switch (type) {
        case ARRAY_HEADER:
            out.add(new ArrayHeaderRedisMessage(length));
            resetDecoder();
            return true;
        case BULK_STRING:
            if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
                throw new RedisCodecException("length: " + length + " (expected: <= " +
                                              RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
            }
            remainingBulkLength = (int) length; // range(int) is already checked.
            return decodeBulkString(in, out);
        default:
            throw new RedisCodecException("bad type: " + type);
        }
    }

    private boolean decodeBulkString(ByteBuf in, List<Object> out) throws Exception {
        switch (remainingBulkLength) {
        case RedisConstants.NULL_VALUE: // $-1\r\n
            out.add(FullBulkStringRedisMessage.NULL_INSTANCE);
            resetDecoder();
            return true;
        case 0:
            state = State.DECODE_BULK_STRING_EOL;
            return decodeBulkStringEndOfLine(in, out);
        default: // expectedBulkLength is always positive.
            out.add(new BulkStringHeaderRedisMessage(remainingBulkLength));
            state = State.DECODE_BULK_STRING_CONTENT;
            return decodeBulkStringContent(in, out);
        }
    }

    // $0\r\n <here> \r\n
    private boolean decodeBulkStringEndOfLine(ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < RedisConstants.EOL_LENGTH) {
            return false;
        }
        readEndOfLine(in);
        out.add(FullBulkStringRedisMessage.EMPTY_INSTANCE);
        resetDecoder();
        return true;
    }

    // ${expectedBulkLength}\r\n <here> {data...}\r\n
    private boolean decodeBulkStringContent(ByteBuf in, List<Object> out) throws Exception {
        final int readableBytes = in.readableBytes();
        if (readableBytes == 0) {
            return false;
        }

        // if this is last frame.
        if (readableBytes >= remainingBulkLength + RedisConstants.EOL_LENGTH) {
            ByteBuf content = in.readSlice(remainingBulkLength);
            readEndOfLine(in);
            // Only call retain after readEndOfLine(...) as the method may throw an exception.
            out.add(new DefaultLastBulkStringRedisContent(content.retain()));
            resetDecoder();
            return true;
        }

        // chunked write.
        int toRead = Math.min(remainingBulkLength, readableBytes);
        remainingBulkLength -= toRead;
        out.add(new DefaultBulkStringRedisContent(in.readSlice(toRead).retain()));
        return true;
    }

    private static void readEndOfLine(final ByteBuf in) {
        final short delim = in.readShort();
        if (RedisConstants.EOL_SHORT == delim) {
            return;
        }
        final byte[] bytes = RedisCodecUtil.shortToBytes(delim);
        throw new RedisCodecException("delimiter: [" + bytes[0] + "," + bytes[1] + "] (expected: \\r\\n)");
    }

    private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) {
        switch (messageType) {
        case SIMPLE_STRING: {
            SimpleStringRedisMessage cached = messagePool.getSimpleString(content);
            return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8));
        }
        case ERROR: {
            ErrorRedisMessage cached = messagePool.getError(content);
            return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8));
        }
        case INTEGER: {
            IntegerRedisMessage cached = messagePool.getInteger(content);
            return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content));
        }
        default:
            throw new RedisCodecException("bad type: " + messageType);
        }
    }

    private static ByteBuf readLine(ByteBuf in) {
        if (!in.isReadable(RedisConstants.EOL_LENGTH)) {
            return null;
        }
        final int lfIndex = in.forEachByte(ByteProcessor.FIND_LF);
        if (lfIndex < 0) {
            return null;
        }
        ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1); // `-1` is for CR
        readEndOfLine(in); // validate CR LF
        return data;
    }

    private long parseRedisNumber(ByteBuf byteBuf) {
        final int readableBytes = byteBuf.readableBytes();
        final boolean negative = readableBytes > 0 && byteBuf.getByte(byteBuf.readerIndex()) == '-';
        final int extraOneByteForNegative = negative ? 1 : 0;
        if (readableBytes <= extraOneByteForNegative) {
            throw new RedisCodecException("no number to parse: " + byteBuf.toString(CharsetUtil.US_ASCII));
        }
        if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) {
            throw new RedisCodecException("too many characters to be a valid RESP Integer: " +
                                          byteBuf.toString(CharsetUtil.US_ASCII));
        }
        if (negative) {
            return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative));
        }
        return parsePositiveNumber(byteBuf);
    }

    private long parsePositiveNumber(ByteBuf byteBuf) {
        toPositiveLongProcessor.reset();
        byteBuf.forEachByte(toPositiveLongProcessor);
        return toPositiveLongProcessor.content();
    }

    private static final class ToPositiveLongProcessor implements ByteProcessor {
        private long result;

        @Override
        public boolean process(byte value) throws Exception {
            if (value < '0' || value > '9') {
                throw new RedisCodecException("bad byte in number: " + value);
            }
            result = result * 10 + (value - '0');
            return true;
        }

        public long content() {
            return result;
        }

        public void reset() {
            result = 0;
        }
    }
}

Other Java examples (source code examples)

Here is a short list of links related to this Java RedisDecoder.java source code file:

... this post is sponsored by my books ...

#1 New Release!

FP Best Seller

 

new blog posts

 

Copyright 1998-2021 Alvin Alexander, alvinalexander.com
All Rights Reserved.

A percentage of advertising revenue from
pages under the /java/jwarehouse URI on this website is
paid back to open source projects.