Nodejs结合ProtoBuffer从零实现一个redis-一万字

28次阅读

共计 14435 个字符,预计需要花费 37 分钟才能阅读完成。

写在开头

  • 想学习造轮子技术, 可以看我之前的原创文章大集合: https://mp.weixin.qq.com/s/RsvI5AFzbp3rm6sOlTmiYQ
  • 如果你想领取 3700G 免费学习资料、或者加入技术交流群(禁止发广告),可以文末 + 我微信, 专注技术不闲聊

什么是 protobuffer?

  • protocol buffer 是 Google 的一种独立的数据交换格式,可运用于多种领域。
  • protocolbuffer(以下简称 PB)是 google 的一种数据交换的格式,它独立于语言,独立于平台。
  • google 提供了多种语言的实现:java、c#、c++、go 和 python,每一种实现都包含了相应语言的编译器以及库文件。由于它是一种二进制的格式,比使用 xml 进行数据交换快许多。
  • 可以把它用于分布式应用之间的数据通信或者异构环境下的数据交换。作为一种效率和兼容性都很优秀的二进制数据传输格式,可以用于诸如网络传输、配置文件、数据存储等诸多领域。

总结一下优点

  • 简单说来 Protobuf 的主要优点就是:简洁,快。
  • 为什么这么说呢?
  • 因为 Protocol Buffer 信息的表示非常紧凑,这意味着消息的体积减少,自然需要更少的资源。比如网络上传输的字节数更少,需要的 IO 更少等,从而提高性能。
  • 对于一条消息,用 Protobuf 序列化后的字节序列为:
08 65 12 06 48 65 6C 6C 6F 77
  • 而如果用 XML,则类似这样:
31 30 31 3C 2F 69 64 3E 3C 6E 61 6D 65 3E 68 65 
 6C 6C 6F 3C 2F 6E 61 6D 65 3E 3C 2F 68 65 6C 6C 
 6F 77 6F 72 6C 64 3E 

在 Node.js 中引入 PB

yarn add protobufjs -D
mkdir proto 
cd proto 
vi message.proto

....
//message.proto 文件
package message;
option optimize_for = LITE_RUNTIME;
message Account{
    required string accountName = 1;
    required string pwd = 2; 
}
message AccountList{
    required int32 index = 1;
    repeated Account list = 2;
}

开始使用 PB 协议

  • 引入protobufjs
  • 读取 root 对象
const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
  • 读取定义好的 pb 文件, 动态引入读取
const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
const AccountList = root.lookupType("message.AccountList");
const Account = root.lookupType("message.Account");
const accountListObj = AccountList.create();
for (let i = 0; i < 5;i++) {const accountObj = Account.create();
  accountObj.accountName = "前端巅峰" + i;
  accountObj.pwd = "Peter 酱要比技术胖还骚" + i;
  accountListObj.list.push(accountObj);
}
const buffer = AccountList.encode(accountListObj).finish();

console.log(buffer)
  • 使用 nodemon 启动项目

  • 打印出了 Buffer, 此时转化成 string
const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
const AccountList = root.lookupType("message.AccountList");
const Account = root.lookupType("message.Account");
const accountListObj = AccountList.create();
const accountObj = Account.create();
accountObj.accountName = "前端巅峰";
accountObj.pwd = "Peter 酱要比技术胖还骚";
accountObj.test = "大保健越做身体越虚是为什么";
accountListObj.list.push(accountObj);
const buffer = AccountList.encode(accountListObj).finish();
console.log(buffer.toString());
  • 打印得到

  • 那么请问, 大宝剑越做身体越虚弱, 是为什么?

引入 socket 通信, 二进制更好的支持

  • 使用原生 net 模块的 socket 通信, 由于是实现 redis, 这里不使用udp 通讯, 而是基于可靠的 TCP, 先编写redis 服务端代码
const net = require("net");
const listenPort = 6380; // 监听端口
const server = net
  .createServer(function (socket) {
    // 创建 socket 服务端
    console.log("connect:" + socket.remoteAddress + ":" + socket.remotePort);
    socket.setKeepAlive(true);
    socket.setEncoding("utf-8");
    // 接收到数据
    socket.on("data", function (data) {console.log("client send:" + data);
    });
    socket.write("Hello client!\r\n");
    // 数据错误事件
    socket.on("error", function (exception) {socket.end();
    });
    // 客户端关闭事件
    socket.on("close", function (data) {});
  })
  .listen(listenPort);
// 服务器监听事件
server.on("listening", function () {console.log("server listening:" + server.address().port);
});
// 服务器错误事件
server.on("error", function (exception) {console.log("server error:" + exception);
});
  • redis默认端口 6379, 我们监听在6380 端口, 心跳保活, 应用层做keep-alive,socket.setKeepAlive(true),保持长链接

编写 redis 客户端

  • 引入 Socket 通信
const {Socket} = require("net");
// 其他引入 pb 文件的代码不变
  • 引入 pb 文件的代码不变, 客户端一份, 服务端一份, 双工通讯两边 pb 文件都要各自有一份
const port = 6380;
const host = "127.0.0.1";
const client = new Socket();
client.setKeepAlive(true);
client.setEncoding("utf-8");
// 连接到服务端
client.connect(port, host, function () {client.write("hello server");
  // 向端口写入数据到达服务端
});
client.on("data", function (data) {console.log("from server:" + data);
  // 得到服务端返回来的数据
});
client.on("error", function (error) {
  // 错误出现之后关闭连接
  console.log("error:" + error);
  client.destory();});
client.on("close", function () {
  // 正常关闭连接
  console.log("Connection closed");
});
  • 通过 socket 链接 6380 端口服务器, 建立长链接

应用层心跳保活 & 重连

  • 重新定义 pb 文件,PingPong
package message;
syntax = "proto3";

message PingPong {
    string message_type = 1; // 会变成 messageType
    string ping = 2; 
    string pong = 3; 
}
  • 客户端编译 pb 文件,使用 pb 协议进行通讯与服务端
const root = ProtoBufJs.loadSync('./proto/message.proto');
const PingPong = root.lookupType('message.PingPong');
 setInterval(() => {const payload = { message_type: '1', ping: '1', pong: '2'};
    const errMsg = PingPong.verify(payload);
    if (errMsg) throw Error(errMsg);
    const message = PingPong.create(payload);
    const buffer = PingPong.encode(message).finish();
    client.write(buffer);
  }, 3000);
  • 每隔 3 秒发送心跳包一次

服务端接受心跳

  • 引入 pb
const root = ProtoBufJs.loadSync('./proto/message.proto');
const PingPong = root.lookupType('message.PingPong');
  • 接受心跳包
const server = createServer(function (socket) {socket.setKeepAlive(true);
  // 创建 socket 服务端
  // 接收到数据
  socket.on('data', function (data) {const decodedMessage = PingPong.decode(data);
    console.log(decodedMessage, 'decodedMessage');
  });
  socket.write('Hello client!\r\n');
  // 数据错误事件
  socket.on('error', function (exception) {console.log('socket error:' + exception);
    socket.end();});
  // 客户端关闭事件
  socket.on('close', function (data) {console.log('client closed!');
  });
}).listen(listenPort);
  • 此时已经能接收到 PB 协议传输的 Buffer,并且解析了

  • 心跳保持,客户端发送心跳
  timer = setInterval(() => {
    count++;
    const payload = {messageType: '1', ping: '1'};
    const errMsg = PingPong.verify(payload);
    if (errMsg) throw Error(errMsg);
    const message = PingPong.create(payload);
    const buffer = PingPong.encode(message).finish();
    client.write(buffer);
  }, 3000);
  • 服务端返回心跳
 socket.on('data', function (data) {const decodedMessage = PingPong.decode(data);
    console.log(decodedMessage,'decodedMessage')
    if(decodedMessage.messageType ==='1'){console.log('进入判断')
      const payload = {messageType: '1', pong: '1'};
      const errMsg = PingPong.verify(payload);
      if (errMsg) throw Error(errMsg);
      const message = PingPong.create(payload);
      const buffer = PingPong.encode(message).finish();
      socket.write(buffer);
    }
  });
  • 客户端记录心跳,做超时、断了的处理
client.on('data', function (data) {const decodedMessage = PingPong.decode(data);
  if (decodedMessage.messageType === '1') {console.log('收到心跳回包');
    count = 0;
  }
  console.log('from server:' + decodedMessage.messageType);
  // 得到服务端返回来的数据
});
  • 发送心跳时候判断,三次后收不到心跳,抛出错误,不再发送心跳
  timer = setInterval(() => {if (count > 3) {clearInterval(timer);
      client.end();
      throw Error('timeout')
    }
    count++;
    const payload = {messageType: '1', ping: '1'};
    const errMsg = PingPong.verify(payload);
    if (errMsg) throw Error(errMsg);
    const message = PingPong.create(payload);
    const buffer = PingPong.encode(message).finish();
    client.write(buffer);
  }, 3000);
  • 服务端故意不回复心跳
  socket.write(buffer);
  • 客户端抛出错误, 取消心跳发送, 断开 socket 链接

  • 此时应该还有重连机制, 这里就不做处理了,还有发送队列这些

实现 redis 的 get,set 方法

  • 数据存储, 服务端用 Map 类型存储
  • 传输使用 PB 协议
  • 接受到消息回复ACK

定义数据传输的Payload pb 字段

  • 定义字段
message Data {
    string message_type = 1; // 会变成 messageType
    Payload data = 2;
}


message Payload {
    required string key = 1;
    required string value =2;
}
  • 此时定义 RedisSet 函数:
const Data = root.lookupType('message.Data');
function RedisSet() {const msg = { messageType: '2', data: { key: '1', value: '2'} };
  const errMsg = Data.verify(msg);
  if (errMsg) throw Error(errMsg);
  const message = Data.create(msg);
  const buffer = Data.encode(message).finish();
  client.write(buffer);
}
  • 服务端 decode 解析反序列化
  socket.on('data', function (data) {const decodedMessage = PingPong.decode(data);
    console.log(decodedMessage,'decodedMessage');
    if(decodedMessage.messageType ==='1'){const payload = { messageType: '1', pong: '1'};
      const errMsg = PingPong.verify(payload);
      if (errMsg) throw Error(errMsg);
      const message = PingPong.create(payload);
      const buffer = PingPong.encode(message).finish();
      socket.write(buffer);
    }
  });
  • 反序列化成功

  • 此时已经拿到了数据, 但是细心观察的会发现,我们拿错了反序列的对象去处理,导致数据有问题, 那么我们需要告知收包方应该用什么对象去反序列化
  • 此时最佳方案应该定义 common 字段去先反序列化一次
message Common {string message_type = 1;}
  • 在服务端先反序列化一次,用common, 得到 messageType 后再进行处理, 再反序列化一次
  socket.on('data', function (data) {const res = Common.decode(data);
    if (res.messageType=== '1') {const payload = { messageType: '1', pong: '1'};
      const errMsg = PingPong.verify(payload);
      if (errMsg) throw Error(errMsg);
      const message = PingPong.create(payload);
      const buffer = PingPong.encode(message).finish();
      socket.write(buffer);
    } else if(res.messageType=== '2'){const message = Data.decode(data)
        const payload = message.data;
        console.log(payload.key,'payload');
        M.set(payload.key,payload.value);
        console.log(M,'m')
    }
  });
  • 完成简单的 set 方法

  • 定义 RedisGet 方法:
const M = new Map();
M.set('1','Peter 酱牛逼')

function RedisGet() {const msg = { messageType: '3', data: { key: '1'} };
  const errMsg = Data.verify(msg);
  if (errMsg) throw Error(errMsg);
  const message = Data.create(msg);
  const buffer = Data.encode(message).finish();
  client.write(buffer);
}
  • 服务端对类型 messageType 为 ’3’ 的进行处理
else if (res.messageType === '3') {const message = Query.decode(data);
      const res = M.get(message.key);
      console.log(res, 'res');
    }

  • 此时 get 方法完成,得到数据,再定义一个 GetData 传输下,先序列化再反序列化就完成了? 肯定不会这么简单
  • redis 的 set、get 的非常高频的操作, 即便是缓存, 不是存入数据库,但还是有失败风险, 因为我们是通过 socket 通讯, 如果网络抖动或者其他原因导致通讯失败, 这个数据没有进入 cache, 那么就有问题

set 方法应该有 cb(回调),get 方法应该有返回值

  • 基于以上两种需求, 需要设计新的模式去完成这个 set、get 功能
  • 无论成功、失败都能知道结果

真正的开始实现 Redis

  • 首先确定通讯依然使用 socket, 长连接
  • 心跳保活需要
  • 需要引入发送队列
  • set 能触发 cb,get 能返回数据(基于promise | generator|async)
  • 基于 pb 协议传输
  • 有 ACK 回复机制, 这样能确保 cb 调用

处理队列

  • set 和 set 的回调队列
  • 我之前想 set 成功后, 应该把数据在客户端保护一份,这样 redis.get 就可以直接拿到数据了, 不需要通过 socket, 后面考虑到多个机器连接 redis,应该保持数据一致性, 此处应该有多种方法保证数据一致性..
const cbQueue = []; // 回调队列
const setQueue = []; //set 的队列
const getQueue = []; //get 的队列
  • 实现 set 队列, 触发cb, 改造 redisSet
function RedisSet(data, cb) {cbQueue.push(cb);
  setQueue.push(data);
  console.log(cbQueue, setQueue, "queue");
  const errMsg = Data.verify(data);
  if (errMsg) throw Error(errMsg);
  const message = Data.create(data);
  const buffer = Data.encode(message).finish();
  client.write(buffer);
}
  • 服务端收到 set 后,在 Map 中追加数据, 用 socket 写入通入客户端
else if (res.messageType === '2') {const message = Data.decode(data);
      const payload = message.data;
      M.set(payload.key, payload.value);
    } 

M.set 后, 使用 socket 通知客户端缓存写入成功

  • 首先定义 pb 字段, 我们使用 message_type = “5” 来通知
message setSucceed {string message_type = 1;}
const msg = {messageType: "5"};
const errMsg = setSucceed.verify(msg);
if (errMsg) throw Error(errMsg);
const m = setSucceed.create(msg);
const buffer = setSucceed.encode(m).finish();
socket.write(buffer);
  • 前端触发 set 队列的 cb, 并且消费这个队列
  RedisSet(data, () => {console.log("set 成功, 触发 cb");
      });
      
 else if (decodedMessage.messageType === "5") {const cb = cbQueue.shift();
      cb && cb();}
  • 结果, 符合预期

但是这个操作, 是有 BUG 的

  • 因为 socket 写入都是异步, 等返回的时候, 那么就有可能乱序, 这里需要加入 ACK 回复机制
  • 在客户端 set 的时候, 生成一个 UUID, 将这个 UUID 带着给服务端, 服务端的 Map 数据存储完成后, 就可以将这个 UUID 带着回来给客户端(相当于 ACK 机制)
  • 客户端接受到 ACK, 触发 cbQueue 中的 cb(此时将 cbQueue 数组类型改成 Map, 方便处理), 触发完成后 remove 掉cb 即可
  • 加入 UUID
yarn add node-uuid
const uuid = require('node-uuid');

// v1 根据时间戳和随机数生成的 uuid
const creatuuid= uuid.v1()
  • 修改 Data 的 pb 文件, 增加 uuid 字段
message Data {
     string message_type = 1; // 会变成 messageType
     string uuid = 2;
     Payload data = 3;
}
  • 修改 set 方法,每次 set 用 UUID 生成 key,value 为 cb, 存储在 Map 中
function RedisSet(data, cb) {
  // v1 根据时间戳和随机数生成的 uuid
  const creatuuid = uuid.v1();
  data.uuid = creatuuid;
  cbQueue.set(creatuuid, cb);
  const errMsg = Data.verify(data);
  if (errMsg) throw Error(errMsg);
  const message = Data.create(data);
  const buffer = Data.encode(message).finish();
  client.write(buffer);
}
  • 服务端修改,返回 ACK 字段,通知客户端消费 cb
else if (res.messageType === '2') {const message = Data.decode(data);
      const payload = message.data;
      M.set(payload.key, payload.value);
      const msg = {messageType: '5', uuid: message.uuid};
      const errMsg = setSucceed.verify(msg);
      if (errMsg) throw Error(errMsg);
      const m = setSucceed.create(msg);
      const buffer = setSucceed.encode(m).finish();
      socket.write(buffer);
    } 
  • 客户端收到 set 成功的 ACK, 根据 UUId,消费cb
 else if (decodedMessage.messageType === '5') {const res = setSucceed.decode(data);
      const cb = cbQueue.get(res.uuid);
      cb && cb() && cbQueue.remove(res.uuid);
    }

这样我们 set 触发 cb 已经完成, 剩下 get 得到返回值

  • 其实这个 get, 也要推敲一下, 我当初想粗糙点, 直接把所有数据同步到客户端, 然后客户端根据 setQueue & cbQueue 去追加数据, 后面觉得很不优雅, 因为 redis 还有集群, 数据同步, 预热,两种不同数据持久化等等
  • 此处可以通过 curl、http 请求等形式拿到, 因为我没看过 redis 源码, 不清楚怎么实现的
  • 但是基于 Node.js 的 redis 使用,是直接通过 redis.get(),传入回调函数后得到一个数据, 没有使用 promise 和 await(我记得是这样)

定义 get 的 pb 字段

  • 定义 Query
message Query {
    string message_type = 1; 
    string key = 2;
    string uuid =3;
}
  • 定义 get 方法
get = function (key, cb) {
    // v1 根据时间戳和随机数生成的 uuid
    const creatuuid = uuid.v1();
    getCbQueue.set(creatuuid, cb);
    const msg = {messageType: '6', key, uuid: creatuuid};
    const errMsg = Query.verify(msg);
    if (errMsg) throw Error(errMsg);
    const message = Query.create(msg);
    const buffer = Query.encode(message).finish();
    TCPClient.write(buffer);
  };
  • 首先发送 messageType 为 6 的包给服务端, 服务端对 6 的 type 做处理
else if (res.messageType === "6") {const message = Query.decode(data);
      const res = M.get(message.key);
      const msg = {messageType: "6", uuid: message.uuid, data: res};
      const errMsg = getSucceed.verify(msg);
      if (errMsg) throw Error(errMsg);
      const m = getSucceed.create(msg);
      const buffer = getSucceed.encode(m).finish();
      socket.write(buffer);
    }
  • 如果是 6,代表是客户端的 get 操作,我们先去 Map 中查询, 然后返回通知给客户端,type 还是 6
  • 客户端接受到 6 的 msgtype 后,通过拿到的 data 和 uuid,调用 getCbQueue 中的对应回掉,并且 delete 掉
else if (decodedMessage.messageType === '6') {const res = getSucceed.decode(data);
        const cb = getCbQueue.get(res.uuid);
        cb && cb(res.data);
        getCbQueue.delete(res.uuid);
      }

很多人想看我真实的代码, 我贴出来我优化后的代码吧, 我觉得真的很整洁.

  • 通过类实现 redis,静态方法定义

  • 如何使用我的 Redis?
const Redis = require('./redis');
const port = 6380;
const host = '127.0.0.1';
const RedisStore = Redis.connect(port, host);

const data = {messageType: '2', data: { key: '1', value: '2'} };

RedisStore.set(data, () => {console.log('set 成功, 触发 cb');
});

RedisStore.get('1', (data) => {console.log('get 成功 data:', data);
});

  • 达到预期

还缺守护进程、数据持久化

  • 守护进程, 我之前写过 cluster 源码解析, 用 pm2 docker 谁都会,但是真的要自己实现,还是要思考一下
  • 有兴趣学习的,可以看我之前的解析 Cluster 源码、PM2 原理文章https://segmentfault.com/a/1190000021230376

  • PM2 这个轮子造起来,可能比 redis 不相上下, 以后有机会可以写一个, 我们今天直接用 PM2 启动即可达到守护进程效果
pm2 start server.js

实现 redis 数据持久化

  • redis 数据持久化两种方式

    • RDB:指定的时间间隔内保存数据快照
    • AOF:先把命令追加到操作日志的尾部,保存所有的历史操作
  • 这里持久化,其实有点麻烦,redis 的 key 数据类型非常丰富
  • redis 数据持久化用来做什么?

    • redis 数据存储在内存中,如果服务器重启或者 redis 挂了 / 重启,如果不做数据持久化,那么数据就丢了

先是实现 AOF, 追加到日志尾部

  • 在服务端接受到 redis.set 的时候进行日志追加
 M.set(payload.key, payload.value);
      fs.appendFile(
        './redis.db',
        `${payload.key},${payload.value}\n`,
        (error) => {if (error) return console.log('追加文件失败' + error.message);
          console.log('追加成功');
        }
      );
  • 结果

  • 这样写是有问题的, 到时候取值的时候不好取,这里可以用到我之前手写富文本编辑器的原理, 用零宽字符占位, 然后读取数据时候再替换分割~

什么是零宽度字符

  • 一种不可打印的 Unicode 字符, 在浏览器等环境不可见, 但是真是存在, 获取字符串长度时也会占位置, 表示某一种控制功能的字符.
  • 常见的零宽字符有哪些
  • 零宽空格(zero-width space, ZWSP)用于可能需要换行处。
    Unicode: U+200B  HTML: &#8203;
  • 零宽不连字 (zero-width non-joiner,ZWNJ)放在电子文本的两个字符之间,抑制本来会发生的连字,而是以这两个字符原本的字形来绘制。
    Unicode: U+200C  HTML: &#8204;
  • 零宽连字(zero-width joiner,ZWJ)是一个控制字符,放在某些需要复杂排版语言(如阿拉伯语、印地语)的两个字符之间,使得这两个本不会发生连字的字符产生了连字效果。
    Unicode: U+200D  HTML: &#8205;
  • 左至右符号(Left-to-right mark,LRM)是一种控制字符,用于计算机的双向文稿排版中。
    Unicode: U+200E  HTML: &lrm; &#x200E; 或 &#8206;
  • 右至左符号(Right-to-left mark,RLM)是一种控制字符,用于计算机的双向文稿排版中。
    Unicode: U+200F  HTML: &rlm; &#x200F; 或 &#8207;
  • 字节顺序标记(byte-order mark,BOM)常被用来当做标示文件是以 UTF-8、UTF-16 或 UTF-32 编码的标记。
    Unicode: U+FEFF
  • 零宽度字符在 JavaScript 的应用
  • 数据防爬
  • 将零宽度字符插入文本中, 干扰关键字匹配。爬虫得到的带有零宽度字符的数据会影响他们的分析,但不会影响用户的阅读数据。
  • 信息传递
  • 将自定义组合的零宽度字符插入文本中,用户复制后会携带不可见信息,达到传递作用。

使用零宽字符

  • 我喜欢用\u200b\,因为它够 2b
 `${payload.key},${payload.value}\u200b\n`,
  • 插入持久化效果

数据预热

  • 在服务器监听端口事件中进行数据预热, 读取磁盘数据到内存中
// 服务器监听事件
server.on('listening', function () {fs.readFile('./redis.db', (err, data) => {console.log(data.toString(), 'xxx');
  });
  console.log('server listening:' + server.address().port);
});
  • 结果 符合预期

  • 上面这样写,其实有问题,为了更好的分割提取磁盘冷数据,我换了下分割的零宽字符
 `${payload.key}-${payload.value}\u200b`,
  • 插入后的数据变成了这样

  • 读取数据算法, 也是要思考下
// 服务器监听事件
server.on('listening', function () {fs.readFile('./redis.db', (err, data) => {const string = data.toString();
    if (string.length > 0) {const result = string.split('\u200b');
      for (let i = 0; i < result.length; i++) {const res = result[i];
        for (let j = 0; j < res.length; j++) {if (res[j] === '-') {continue;}
          j === 0 ? M.set(res[j], null) : M.set(res[j - 2], res[j]);
        }
      }
    }
  });
  console.log('server listening:' + server.address().port);
});
  • 最终效果, 符合预期

  • 在 redis 出错的时候,将数据刷入磁盘中以及定期持久化数据,如果要实现,也可以类似的思路, 当然这并不是 redis 的真正实现, 只是一个模拟.

如果感觉写得不错,关注下微信公众号 [前端巅峰]

  • 我是 Peter,架构设计过 20 万人端到端加密超级群功能的桌面 IM 软件, 我的微信:CALASFxiaotan
  • 另外欢迎收藏我的资料网站: 前端生活社区:https://qianduan.life, 感觉对你有帮助,可以右下角点个在看,关注一波公众号:[前端巅峰]

正文完
 0