ITKeyword,专注技术干货聚合推荐

注册 | 登录

用Netty解析Redis网络协议

相关推荐:一个 C++ redis 集群管理工具

集群版 redis3.0 发布以来,官方仅提供了一个使用 ruby 写的集群管理工具,在创建 redis 集群时需要使用该工具。因为 ruby 中的一些包依赖问题,导致一些生

用Netty解析Redis网络协议根据Redis官方文档的介绍,学习了一下Redis网络通信协议。然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下!1.RESP协议Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol)的网络通信协议交换数据。RESP的设计权衡了实现简单、解析快速、人类可读这三个因素。Redis客户端通过RESP序列化整数、字符串、数据等数据类型,发送字符串数组表示参数的命令到服务端。服务端根据不同的请求命令响应不同的数据类型。除了管道和订阅外,Redis客户端和服务端都是以这种简单的请求-响应模型通信的。具体来看,RESP支持五种数据类型。以”*”消息头标识总长度,消息内部还可能有”$”标识字符串长度,每行以\r\n结束:简单字符串(Simple String):以”+”开头,表示正确的状态信息,”+”后就是具体信息。许多Redis命令使用简单字符串作为成功的响应,例如”+OK\r\n”。但简单字符串因为不像Bulk String那样有长度信息,而只能靠\r\n确定是否结束,所以 Simple String不是二进制安全的,即字符串里不能包含\r\n。错误(Error):以”-“开头,表示错误的状态信息,”-“后就是具体信息。整数(Integer):以”:”开头,像SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD都返回整数。批量字符串(Bulk String):以”$”开头,表示下一行的字符串长度,具体字符串在下一行中,字符串最大能达到512MB。”$-1\r\n”叫做Null Bulk String,表示没有数据存在。数组(Array):以”*”开头,表示消息体总共有多少行(不包括当前行),”*”是具体行数。客户端用RESP数组表示命令发送到服务端,反过来服务端也可以用RESP数组返回数据的集合给客户端。数组可以是混合数据类型,例如一个整数加一个字符串”*2\r\n:1\r\n$6\r\nfoobar\r\n”。另外,嵌套数组也是可以的。例如,观察下面命令对应的RESP,这一组set/get也正是我们要在Netty里实现的:set name helloworld->*3\r\n$3\r\nset\r\n$4\r\nname\r\n$10\r\nhelloworld\r\n<-:1\r\nget name->*2\r\n$3\r\nget\r\n$4\r\nname\r\n<-$10\r\nhelloworld\r\nset name abc111->*3\r\n$3\r\nset\r\n$4\r\nname\r\n$6\r\nabc111\r\n<-:0\r\nget age->*2\r\n$3\r\nget\r\n$3\r\nage\r\n<-:-1\r\n2.用Netty解析协议下面就用高性能的网络通信框架Netty实现一个简单的Redis服务器后端,解析set和get命令,并保存键值对。2.1 Netty版本Netty版本,5.0还处于alpha,使用Final版里最新的。但即便是4.0.25.Final竟然也跟4.0的前几个版本有些不同,网上一些例子中用的API根本就找不到了。Netty的API改得有点太“任性”了吧?:)

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

<version>4.0.25.Final</version>

</dependency>2.2 启动服务Netty服务器启动代码,这套代码应该是Netty 4里的标准模板了,具体细节就不在本文赘述了。主要关注我们注册的几个Handler。Netty中Handler分为Inbound和Outbound,RedisCommandDecoder和RedisCommandHandler是Inbound,RedisCommandDecoder是Outbound:RedisCommandDecoder:解析Redis协议,将字节数组转为Command对象。RedisReplyEncoder:将响应写入到输出流中,返回给客户端。RedisCommandHandler:执行Command中的命令。public class Main {

public static void main(String[] args) throws Exception {

new Main().start(6379);

}

public void start(int port) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap()

.group(group)

.channel(NioServerSocketChannel.class)

.localAddress(port)

.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline()

.addLast(new RedisCommandDecoder())

.addLast(new RedisReplyEncoder())

.addLast(new RedisCommandHandler());

}

});

// Bind and start to accept incoming connections.

ChannelFuture f = b.bind(port).sync();

// Wait until the server socket is closed.

f.channel().closeFuture().sync();

} finally {

// Shutdown the EventLoopGroup, which releases all resources.

group.shutdownGracefully();

}

}}2.3 协议解析RedisCommandDecoder开始时cmds是null,进入doDecodeNumOfArgs先解析出命令和参数的个数,并初始化cmds。之后就会进入doDecodeArgs逐一解析命令名和参数了。当最后完成时,会根据解析结果创建出RedisCommand对象,并加入到out列表里。这样下一个handler就能继续处理了。public class RedisCommandDecoder extends ReplayingDecoder<Void> {

/** Decoded command and arguments */

private byte[][] cmds;

/** Current argument */

private int arg;

/** Decode in block-io style, rather than nio. */

@Override

protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

if (cmds == null) {

if (in.readByte() == '*') {

doDecodeNumOfArgs(in);

}

} else {

doDecodeArgs(in);

}

if (isComplete()) {

doSendCmdToHandler(out);

doCleanUp();

}

}

/** Decode number of arguments */

private void doDecodeNumOfArgs(ByteBuf in) {

// Ignore negative case

int numOfArgs = readInt(in);

System.out.println("RedisCommandDecoder NumOfArgs: " + numOfArgs);

cmds = new byte[numOfArgs][];

checkpoint();

}

/** Decode arguments */

private void doDecodeArgs(ByteBuf in) {

for (int i = arg; i < cmds.length; i++) {

if (in.readByte() == '$') {

int lenOfBulkStr = readInt(in);

System.out.println("RedisCommandDecoder LenOfBulkStr[" + i + "]: " + lenOfBulkStr);

cmds[i] = new byte[lenOfBulkStr];

in.readBytes(cmds[i]);

// Skip CRLF(\r\n)

in.skipBytes(2);

arg++;

checkpoint();

} else {

throw new IllegalStateException("Invalid argument");

}

}

}

/**

* cmds != null means header decode complete

* arg > 0 means arguments decode has begun

* arg == cmds.length means complete!

*/

private boolean isComplete() {

return (cmds != null)

&& (arg > 0)

&& (arg == cmds.length);

}

/** Send decoded command to next handler */

private void doSendCmdToHandler(List<Object> out) {

System.out.println("RedisCommandDecoder: Send command to next handler");

if (cmds.length == 2) {

out.add(new RedisCommand(new String(cmds[0]), cmds[1]));

} else if (cmds.length == 3) {

out.add(new RedisCommand(new String(cmds[0]), cmds[1], cmds[2]));

} else {

throw new IllegalStateException("Unknown command");

}

}

/** Clean up state info */

private void doCleanUp() {

this.cmd

相关推荐:使用 acl 库编写高效的 C++ redis 客户端应用

一、概述 (可以直接略过此段)redis 最近做为 nosql 数据服务应用越来越广泛,其相对于 memcached 的最大优点是提供了更加丰富的数据结构,所以应用场

s = null;

this.arg = 0;

}

private int readInt(ByteBuf in) {

int integer = 0;

char c;

while ((c = (char) in.readByte()) != '\r') {

integer = (integer * 10) + (c - '0');

}

if (in.readByte() != '\n') {

throw new IllegalStateException("Invalid number");

}

return integer;

}}因为我们只是简单实现set和get命令,所以只可能有一个参数或两个参数:public class RedisCommand {

/** Command name */

private final String name;

/** Optional arguments */

private byte[] arg1;

private byte[] arg2;

public RedisCommand(String name, byte[] arg1) {

this.name = name;

this.arg1 = arg1;

}

public RedisCommand(String name, byte[] arg1, byte[] arg2) {

this.name = name;

this.arg1 = arg1;

this.arg2 = arg2;

}

public String getName() {

return name;

}

public byte[] getArg1() {

return arg1;

}

public byte[] getArg2() {

return arg2;

}

@Override

public String toString() {

return "Command{" +

"name='" + name + '\'' +

", arg1=" + Arrays.toString(arg1) +

", arg2=" + Arrays.toString(arg2) +

'}';

}}2.4 命令执行RedisCommandHandler拿到RedisCommand后,根据命令名执行命令。这里用一个HashMap模拟数据库了,set就往Map里放,get就从里面取。除了执行具体操作,还要根据执行结果返回不同的Reply对象:保存成功:返回:1\r\n。修改成功:返回:0\r\n。说明之前Map中已存在此Key。查询成功:返回Bulk String。具体见后面BulkReply。Key不存在:返回:-1\r\n。@ChannelHandler.Sharablepublic class RedisCommandHandler extends SimpleChannelInboundHandler<RedisCommand> {

private HashMap<String, byte[]> database = new HashMap<String, byte[]>();

@Override

protected void channelRead0(ChannelHandlerContext ctx, RedisCommand msg) throws Exception {

System.out.println("RedisCommandHandler: " + msg);

if (msg.getName().equalsIgnoreCase("set")) {

if (database.put(new String(msg.getArg1()), msg.getArg2()) == null) {

ctx.writeAndFlush(new IntegerReply(1));

} else {

ctx.writeAndFlush(new IntegerReply(0));

}

}

else if (msg.getName().equalsIgnoreCase("get")) {

byte[] value = database.get(new String(msg.getArg1()));

if (value != null && value.length > 0) {

ctx.writeAndFlush(new BulkReply(value));

} else {

ctx.writeAndFlush(BulkReply.NIL_REPLY);

}

}

}}2.5 发送响应RedisReplyEncoder实现比较简单,拿到RedisReply消息后,直接写入到ByteBuf中就可以了。具体的写入方法都在各个RedisReply的具体实现中。public class RedisReplyEncoder extends MessageToByteEncoder<RedisReply> {

@Override

protected void encode(ChannelHandlerContext ctx, RedisReply msg, ByteBuf out) throws Exception {

System.out.println("RedisReplyEncoder: " + msg);

msg.write(out);

}}public interface RedisReply<T> {

byte[] CRLF = new byte[] { '\r', '\n' };

T data();

void write(ByteBuf out) throws IOException;}public class IntegerReply implements RedisReply<Integer> {

private static final char MARKER = ':';

private final int data;

public IntegerReply(int data) {

this.data = data;

}

@Override

public Integer data() {

return this.data;

}

@Override

public void write(ByteBuf out) throws IOException {

out.writeByte(MARKER);

out.writeBytes(String.valueOf(data).getBytes());

out.writeBytes(CRLF);

}

@Override

public String toString() {

return "IntegerReply{" +

"data=" + data +

'}';

}}public class BulkReply implements RedisReply<byte[]> {

public static final BulkReply NIL_REPLY = new BulkReply();

private static final char MARKER = '$';

private final byte[] data;

private final int len;

public BulkReply() {

this.data = null;

this.len = -1;

}

public BulkReply(byte[] data) {

this.data = data;

this.len = data.length;

}

@Override

public byte[] data() {

return this.data;

}

@Override

public void write(ByteBuf out) throws IOException {

// 1.Write header

out.writeByte(MARKER);

out.writeBytes(String.valueOf(len).getBytes());

out.writeBytes(CRLF);

// 2.Write data

if (len > 0) {

out.writeBytes(data);

out.writeBytes(CRLF);

}

}

@Override

public String toString() {

return "BulkReply{" +

"bytes=" + Arrays.toString(data) +

'}';

}}2.6 运行测试服务端跑起来后,用官方的redis-cli就能连上我们的服务,执行一些命令测试一下。看到自己实现的Redis“伪服务端”能够“骗过”redis-cli,还是很有成就感的!127.0.0.1:6379> set name helloworld(integer) 1127.0.0.1:6379> get name"helloworld"127.0.0.1:6379> set name abc123(integer) 0127.0.0.1:6379> get name"abc123"127.0.0.1:6379> get age(nil)3.Netty 4中的那些“坑”因为是初次使用Netty 4,好多网上的资料都是Netty 3或者Netty 4早期版本的,API都不一样了,所以碰到了不少问题,官方文档里也没找到答案,一点点调试、猜测、看源码才摸出点儿“门道”:Handler的基础类:Netty 4里使用SimpleChannelInboundHandler就可以了,之前的API已经不适用了。Inbound和Outbound处理器间的数据交换:Context对象是数据交换的接口,不同的是:Inbound之间是靠fireChannelRead()进行数据交换,但从Inbound到Outbound就要靠writeAndFlush()触发了。Inbound和Outbound的顺序:fireChannelRead()会向后找下一个Inbound处理器,但writeAndFlush()会向前找前一个Outbound处理器。所以在ChannelInitializer中,Outbound要放在SimpleChannelInboundHandler前面才能进行数据交换。@Sharable注解:如果Handler是无状态的话,可以标这个注解。

$(function () {

$('pre.prettyprint code').each(function () {

var lines = $(this).text().split('\n').length;

var $numbering = $('<ul/>').addClass('pre-numbering').hide();

$(this).addClass('has-numbering').parent().append($numbering);

for (i = 1; i <= lines; i++) {

$numbering.append($('<li/>').text(i));

};

$numbering.fadeIn(1700);

});

});

相关推荐:使用 redis_builder 管理 redis 集群

在 <一个 C++ redis 集群管理工具> 中主要讲述了如何使用 redis_builder 工具创建 redis 集群的过程,除此之外,该工具还具有更为强大的 redis 客户端管

用Netty解析Redis网络协议根据Redis官方文档的介绍,学习了一下Redis网络通信协议。然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下!1.RESP协议Re...

相关阅读排行


用户评论

游客

相关内容推荐

最新文章

×

×

请激活账号

为了能正常使用评论、编辑功能及以后陆续为用户提供的其他产品,请激活账号。

您的注册邮箱: 修改

重新发送激活邮件 进入我的邮箱

如果您没有收到激活邮件,请注意检查垃圾箱。