IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    [原]用Netty解析Redis网络协议

    dc_726发表于 2015-06-19 21:45:40
    love 0

    用Netty解析Redis网络协议

    根据Redis官方文档的介绍,学习了一下Redis网络通信协议。然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下!

    1.RESP协议

    Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol)的网络通信协议交换数据。RESP的设计权衡了实现简单、解析快速、人类可读这三个因素。Redis客户端通过RESP序列化整数、字符串、数据等数据类型,发送字符串数组表示参数的命令到服务端。服务端根据不同的请求命令响应不同的数据类型。除了管道和订阅外,Redis客户端和服务端都是以这种简单的请求-响应模型通信的。

    具体来看,RESP支持五种数据类型。以”*”消息头标识总长度,消息内部还可能有”$”标识字符串长度,每行以\r\n结束:

    • 简单字符串(Simple String):以”+”开头,表示正确的状态信息,”+”后就是具体信息。许多Redis命令使用简单字符串作为成功的响应,例如”+OK\r\n”。但简单字符串因为不像Bulk String那样有长度信息,而只能靠\r\n确定是否结束,所以 Simple String不是二进制安全的,即字符串里不能包含\r\n。
    • 错误(Error):以”-“开头,表示错误的状态信息,”-“后就是具体信息。
    • 整数(Integer):以”:”开头,像SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD都返回整数。
    • 批量字符串(Bulk String):以”$”开头,表示下一行的字符串长度,具体字符串在下一行中,字符串最大能达到512MB。”$-1\r\n”叫做Null Bulk String,表示没有数据存在。
    • 数组(Array):以”*”开头,表示消息体总共有多少行(不包括当前行),”*”是具体行数。客户端用RESP数组表示命令发送到服务端,反过来服务端也可以用RESP数组返回数据的集合给客户端。数组可以是混合数据类型,例如一个整数加一个字符串”*2\r\n:1\r\n$6\r\nfoobar\r\n”。另外,嵌套数组也是可以的。

    例如,观察下面命令对应的RESP,这一组set/get也正是我们要在Netty里实现的:

    set name helloworld
    ->
    *3\r\n
    $3\r\n
    set\r\n
    $4\r\n
    name\r\n
    $10\r\n
    helloworld\r\n
    <-
    :1\r\n
    
    get name
    ->
    *2\r\n
    $3\r\n
    get\r\n
    $4\r\n
    name\r\n
    <-
    $10\r\n
    helloworld\r\n
    
    set name abc111
    ->
    *3\r\n
    $3\r\n
    set\r\n
    $4\r\n
    name\r\n
    $6\r\n
    abc111\r\n
    <-
    :0\r\n
    
    get age
    ->
    *2\r\n
    $3\r\n
    get\r\n
    $3\r\n
    age\r\n
    <-
    :-1\r\n

    2.用Netty解析协议

    下面就用高性能的网络通信框架Netty实现一个简单的Redis服务器后端,解析set和get命令,并保存键值对。

    2.1 Netty版本

    Netty版本,5.0还处于alpha,使用Final版里最新的。但即便是4.0.25.Final竟然也跟4.0的前几个版本有些不同,网上一些例子中用的API根本就找不到了。Netty的API改得有点太“任性”了吧?:)

            <dependency>
                <groupId>io.nettygroupId>
                <artifactId>netty-allartifactId>
                <version>4.0.25.Finalversion>
            dependency>

    2.2 启动服务

    Netty服务器启动代码,这套代码应该是Netty 4里的标准模板了,具体细节就不在本文赘述了。主要关注我们注册的几个Handler。Netty中Handler分为Inbound和Outbound,RedisCommandDecoder和RedisCommandHandler是Inbound,RedisCommandDecoder是Outbound:

    • RedisCommandDecoder:解析Redis协议,将字节数组转为Command对象。
    • RedisReplyEncoder:将响应写入到输出流中,返回给客户端。
    • RedisCommandHandler:执行Command中的命令。
    public class Main {
    
        public static void main(String[] args) throws Exception {
            new Main().start(6379);
        }
    
        public void start(int port) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap()
                        .group(group)
                        .channel(NioServerSocketChannel.class)
                        .localAddress(port)
                        .childHandler(new ChannelInitializer() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline()
                                        .addLast(new RedisCommandDecoder())
                                        .addLast(new RedisReplyEncoder())
                                        .addLast(new RedisCommandHandler());
                            }
                        });
    
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shutdown the EventLoopGroup, which releases all resources.
                group.shutdownGracefully();
            }
        }
    
    }

    2.3 协议解析

    RedisCommandDecoder开始时cmds是null,进入doDecodeNumOfArgs先解析出命令和参数的个数,并初始化cmds。之后就会进入doDecodeArgs逐一解析命令名和参数了。当最后完成时,会根据解析结果创建出RedisCommand对象,并加入到out列表里。这样下一个handler就能继续处理了。

    public class RedisCommandDecoder extends ReplayingDecoder<Void> {
    
        /** Decoded command and arguments */
        private byte[][] cmds;
    
        /** Current argument */
        private int arg;
    
        /** Decode in block-io style, rather than nio. */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
            if (cmds == null) {
                if (in.readByte() == '*') {
                    doDecodeNumOfArgs(in);
                }
            } else {
                doDecodeArgs(in);
            }
    
            if (isComplete()) {
                doSendCmdToHandler(out);
                doCleanUp();
            }
        }
    
        /** Decode number of arguments */
        private void doDecodeNumOfArgs(ByteBuf in) {
            // Ignore negative case
            int numOfArgs = readInt(in);
            System.out.println("RedisCommandDecoder NumOfArgs: " + numOfArgs);
            cmds = new byte[numOfArgs][];
    
            checkpoint();
        }
    
        /** Decode arguments */
        private void doDecodeArgs(ByteBuf in) {
            for (int i = arg; i < cmds.length; i++) {
                if (in.readByte() == '$') {
                    int lenOfBulkStr = readInt(in);
                    System.out.println("RedisCommandDecoder LenOfBulkStr[" + i + "]: " + lenOfBulkStr);
    
                    cmds[i] = new byte[lenOfBulkStr];
                    in.readBytes(cmds[i]);
    
                    // Skip CRLF(\r\n)
                    in.skipBytes(2);
    
                    arg++;
                    checkpoint();
                } else {
                    throw new IllegalStateException("Invalid argument");
                }
            }
        }
    
        /**
         * cmds != null means header decode complete
         * arg > 0 means arguments decode has begun
         * arg == cmds.length means complete!
         */
        private boolean isComplete() {
            return (cmds != null)
                    && (arg > 0)
                    && (arg == cmds.length);
        }
    
        /** Send decoded command to next handler */
        private void doSendCmdToHandler(List out) {
            System.out.println("RedisCommandDecoder: Send command to next handler");
            if (cmds.length == 2) {
                out.add(new RedisCommand(new String(cmds[0]), cmds[1]));
            } else if (cmds.length == 3) {
                out.add(new RedisCommand(new String(cmds[0]), cmds[1], cmds[2]));
            } else {
                throw new IllegalStateException("Unknown command");
            }
        }
    
        /** Clean up state info */
        private void doCleanUp() {
            this.cmds = null;
            this.arg = 0;
        }
    
        private int readInt(ByteBuf in) {
            int integer = 0;
            char c;
            while ((c = (char) in.readByte()) != '\r') {
                integer = (integer * 10) + (c - '0');
            }
    
            if (in.readByte() != '\n') {
                throw new IllegalStateException("Invalid number");
            }
            return integer;
        }
    
    }

    因为我们只是简单实现set和get命令,所以只可能有一个参数或两个参数:

    public class RedisCommand {
    
        /** Command name */
        private final String name;
    
        /** Optional arguments */
        private byte[] arg1;
        private byte[] arg2;
    
        public RedisCommand(String name, byte[] arg1) {
            this.name = name;
            this.arg1 = arg1;
        }
    
        public RedisCommand(String name, byte[] arg1, byte[] arg2) {
            this.name = name;
            this.arg1 = arg1;
            this.arg2 = arg2;
        }
    
        public String getName() {
            return name;
        }
    
        public byte[] getArg1() {
            return arg1;
        }
    
        public byte[] getArg2() {
            return arg2;
        }
    
        @Override
        public String toString() {
            return "Command{" +
                    "name='" + name + '\'' +
                    ", arg1=" + Arrays.toString(arg1) +
                    ", arg2=" + Arrays.toString(arg2) +
                    '}';
        }
    }

    2.4 命令执行

    RedisCommandHandler拿到RedisCommand后,根据命令名执行命令。这里用一个HashMap模拟数据库了,set就往Map里放,get就从里面取。除了执行具体操作,还要根据执行结果返回不同的Reply对象:

    • 保存成功:返回:1\r\n。
    • 修改成功:返回:0\r\n。说明之前Map中已存在此Key。
    • 查询成功:返回Bulk String。具体见后面BulkReply。
    • Key不存在:返回:-1\r\n。
    @ChannelHandler.Sharable
    public class RedisCommandHandler extends SimpleChannelInboundHandler<RedisCommand> {
    
        private HashMapbyte[]> database = new HashMapbyte[]>();
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RedisCommand msg) throws Exception {
            System.out.println("RedisCommandHandler: " + msg);
    
            if (msg.getName().equalsIgnoreCase("set")) {
                if (database.put(new String(msg.getArg1()), msg.getArg2()) == null) {
                    ctx.writeAndFlush(new IntegerReply(1));
                } else {
                    ctx.writeAndFlush(new IntegerReply(0));
                }
            }
            else if (msg.getName().equalsIgnoreCase("get")) {
                byte[] value = database.get(new String(msg.getArg1()));
                if (value != null && value.length > 0) {
                    ctx.writeAndFlush(new BulkReply(value));
                } else {
                    ctx.writeAndFlush(BulkReply.NIL_REPLY);
                }
            }
        }
    
    }

    2.5 发送响应

    RedisReplyEncoder实现比较简单,拿到RedisReply消息后,直接写入到ByteBuf中就可以了。具体的写入方法都在各个RedisReply的具体实现中。

    public class RedisReplyEncoder extends MessageToByteEncoder<RedisReply> {
    
        @Override
        protected void encode(ChannelHandlerContext ctx, RedisReply msg, ByteBuf out) throws Exception {
            System.out.println("RedisReplyEncoder: " + msg);
            msg.write(out);
        }
    
    }
    public interface RedisReply<T> {
    
        byte[] CRLF = new byte[] { '\r', '\n' };
    
        T data();
    
        void write(ByteBuf out) throws IOException;
    
    }
    
    public class IntegerReply implements RedisReply<Integer> {
    
        private static final char MARKER = ':';
    
        private final int data;
    
        public IntegerReply(int data) {
            this.data = data;
        }
    
        @Override
        public Integer data() {
            return this.data;
        }
    
        @Override
        public void write(ByteBuf out) throws IOException {
            out.writeByte(MARKER);
            out.writeBytes(String.valueOf(data).getBytes());
            out.writeBytes(CRLF);
        }
    
        @Override
        public String toString() {
            return "IntegerReply{" +
                    "data=" + data +
                    '}';
        }
    
    }
    
    public class BulkReply implements RedisReply<byte[]> {
    
        public static final BulkReply NIL_REPLY = new BulkReply();
    
        private static final char MARKER = '$';
    
        private final byte[] data;
    
        private final int len;
    
        public BulkReply() {
            this.data = null;
            this.len = -1;
        }
    
        public BulkReply(byte[] data) {
            this.data = data;
            this.len = data.length;
        }
    
        @Override
        public byte[] data() {
            return this.data;
        }
    
        @Override
        public void write(ByteBuf out) throws IOException {
            // 1.Write header
            out.writeByte(MARKER);
            out.writeBytes(String.valueOf(len).getBytes());
            out.writeBytes(CRLF);
    
            // 2.Write data
            if (len > 0) {
                out.writeBytes(data);
                out.writeBytes(CRLF);
            }
        }
    
        @Override
        public String toString() {
            return "BulkReply{" +
                    "bytes=" + Arrays.toString(data) +
                    '}';
        }
    }

    2.6 运行测试

    服务端跑起来后,用官方的redis-cli就能连上我们的服务,执行一些命令测试一下。看到自己实现的Redis“伪服务端”能够“骗过”redis-cli,还是很有成就感的!

    127.0.0.1:6379> set name helloworld
    (integer) 1
    127.0.0.1:6379> get name
    "helloworld"
    127.0.0.1:6379> set name abc123
    (integer) 0
    127.0.0.1:6379> get name
    "abc123"
    127.0.0.1:6379> get age
    (nil)

    3.Netty 4中的那些“坑”

    因为是初次使用Netty 4,好多网上的资料都是Netty 3或者Netty 4早期版本的,API都不一样了,所以碰到了不少问题,官方文档里也没找到答案,一点点调试、猜测、看源码才摸出点儿“门道”:

    • Handler的基础类:Netty 4里使用SimpleChannelInboundHandler就可以了,之前的API已经不适用了。
    • Inbound和Outbound处理器间的数据交换:Context对象是数据交换的接口,不同的是:Inbound之间是靠fireChannelRead()进行数据交换,但从Inbound到Outbound就要靠writeAndFlush()触发了。
    • Inbound和Outbound的顺序:fireChannelRead()会向后找下一个Inbound处理器,但writeAndFlush()会向前找前一个Outbound处理器。所以在ChannelInitializer中,Outbound要放在SimpleChannelInboundHandler前面才能进行数据交换。
    • @Sharable注解:如果Handler是无状态的话,可以标这个注解。


沪ICP备19023445号-2号
友情链接