本文次要钻研一下claudb的exportRDB

exportRDB

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.java

public class DBServerState {  //......  public void exportRDB(OutputStream output) throws IOException {    RDBOutputStream rdb = new RDBOutputStream(output);    rdb.preamble(RDB_VERSION);    for (int i = 0; i < databases.size(); i++) {      Database db = databases.get(i);      if (!db.isEmpty()) {        rdb.select(i);        rdb.dabatase(db);      }    }    rdb.end();  }  //......}
  • exportRDB办法先通过rdb.preamble(RDB_VERSION)写入redis魔数及版本;而后遍历databases,挨个执行rdb.select(i)写入SELECT及db的长度,在执行rdb.dabatase(db),遍历entry,挨个按expiredAt、type、key、value写入数据;end办法写入END_OF_STREAM,而后再写入checksum

RDBOutputStream

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/persistence/RDBOutputStream.java

public class RDBOutputStream {  private static final byte[] REDIS = safeString("REDIS").getBytes();  private static final int TTL_MILLISECONDS = 0xFC;  private static final int END_OF_STREAM = 0xFF;  private static final int SELECT = 0xFE;  private final CheckedOutputStream out;  public RDBOutputStream(OutputStream out) {    super();    this.out = new CheckedOutputStream(out, new CRC64());  }  public void preamble(int version) throws IOException {    out.write(REDIS);    out.write(version(version));  }  private byte[] version(int version) {    StringBuilder sb = new StringBuilder(String.valueOf(version));    for (int i = sb.length(); i < Integer.BYTES; i++) {      sb.insert(0, '0');    }    return sb.toString().getBytes(StandardCharsets.UTF_8);  }  public void select(int db) throws IOException {    out.write(SELECT);    length(db);  }  public void dabatase(Database db) throws IOException {    for (Tuple2<DatabaseKey, DatabaseValue> entry : db.entrySet()) {      value(entry.get1(), entry.get2());    }  }  private void value(DatabaseKey key, DatabaseValue value) throws IOException {    expiredAt(value.getExpiredAt());    type(value.getType());    key(key);    value(value);  }  private void expiredAt(Instant expiredAt) throws IOException {    if (expiredAt != null) {      out.write(TTL_MILLISECONDS);      out.write(ByteUtils.toByteArray(expiredAt.toEpochMilli()));    }  }  private void type(DataType type) throws IOException {    out.write(type.ordinal());  }  private void key(DatabaseKey key) throws IOException {    string(key.getValue());  }  private void value(DatabaseValue value) throws IOException {    switch (value.getType()) {    case STRING:      string(value.getString());      break;    case LIST:      list(value.getList());      break;    case HASH:      hash(value.getHash());      break;    case SET:      set(value.getSet());      break;    case ZSET:      zset(value.getSortedSet());      break;    default:      break;    }  }  private void length(int length) throws IOException {    if (length < 0x40) {      // 1 byte: 00XXXXXX      out.write(length);    } else if (length < 0x4000) {      // 2 bytes: 01XXXXXX XXXXXXXX      int b1 = length >> 8;      int b2 = length & 0xFF;      out.write(0x40 | b1);      out.write(b2);    } else {      // 5 bytes: 10...... XXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX      out.write(0x80);      out.write(toByteArray(length));    }  }  private void string(String value) throws IOException {    string(safeString(value));  }  private void string(SafeString value) throws IOException {    byte[] bytes = value.getBytes();    length(bytes.length);    out.write(bytes);  }  private void string(double value) throws IOException {    string(String.valueOf(value));  }  private void list(ImmutableList<SafeString> value) throws IOException {    length(value.size());    for (SafeString item : value) {      string(item);    }  }  private void hash(ImmutableMap<SafeString, SafeString> value) throws IOException {    length(value.size());    for (Tuple2<SafeString, SafeString> entry : value.entries()) {      string(entry.get1());      string(entry.get2());    }  }  private void set(ImmutableSet<SafeString> value) throws IOException {    length(value.size());    for (SafeString item : value) {      string(item);    }  }  private void zset(NavigableSet<Entry<Double, SafeString>> value) throws IOException {    length(value.size());    for (Entry<Double, SafeString> item : value) {      string(item.getValue());      string(item.getKey());    }  }  public void end() throws IOException {    out.write(END_OF_STREAM);    out.write(toByteArray(out.getChecksum().getValue()));    out.flush();  }}
  • RDBOutputStream的结构器用CheckedOutputStream包装了OutputStream;其dabatase办法遍历db.entrySet(),挨个执行value办法;value办法先别离执行expiredAt、type、key、value办法;value办法针对STRING、LIST、HASH、SET、ZSET这几种value类型做了不同的解决;string办法间接写入string;list办法先写入list大小,再挨个写入list元素;hash办法先写入hash大小,再挨个写入key和value;set先写入set大小,在挨个写入set元素;zset先写入zset大小,再挨个写入value和score;end办法写入END_OF_STREAM,而后再写入checksum

CheckedOutputStream

java.base/java/util/zip/CheckedOutputStream.java

publicclass CheckedOutputStream extends FilterOutputStream {    private Checksum cksum;    /**     * Creates an output stream with the specified Checksum.     * @param out the output stream     * @param cksum the checksum     */    public CheckedOutputStream(OutputStream out, Checksum cksum) {        super(out);        this.cksum = cksum;    }    /**     * Writes a byte. Will block until the byte is actually written.     * @param b the byte to be written     * @exception IOException if an I/O error has occurred     */    public void write(int b) throws IOException {        out.write(b);        cksum.update(b);    }    /**     * Writes an array of bytes. Will block until the bytes are     * actually written.     * @param b the data to be written     * @param off the start offset of the data     * @param len the number of bytes to be written     * @exception IOException if an I/O error has occurred     */    public void write(byte[] b, int off, int len) throws IOException {        out.write(b, off, len);        cksum.update(b, off, len);    }    /**     * Returns the Checksum for this output stream.     * @return the Checksum     */    public Checksum getChecksum() {        return cksum;    }}
  • CheckedOutputStream继承了FilterOutputStream,其结构器要求输出OutputStream及Checksum,每次write的时候都会执行cksum.update;其getChecksum办法间接返回cksum

CRC64

public class CRC64 implements Checksum {  private static final int LOOKUPTABLE_SIZE = 256;  private static final long POLY64REV = 0xC96C5795D7870F42L;  private static final long LOOKUPTABLE[] = new long[LOOKUPTABLE_SIZE];  private long crc = -1;  static {    for (int b = 0; b < LOOKUPTABLE.length; ++b) {      long r = b;      for (int i = 0; i < Long.BYTES; ++i) {        if ((r & 1) == 1) {          r = (r >>> 1) ^ POLY64REV;        } else {          r >>>= 1;        }      }      LOOKUPTABLE[b] = r;    }  }  @Override  public void update(int b) {    crc = LOOKUPTABLE[((b & 0xFF) ^ (int) crc) & 0xFF] ^ (crc >>> 8);  }  @Override  public void update(byte[] buf, int off, int len) {    int end = off + len;    while (off < end) {      crc = LOOKUPTABLE[(buf[off++] ^ (int) crc) & 0xFF] ^ (crc >>> 8);    }  }  @Override  public long getValue() {    return ~crc;  }  @Override  public void reset() {    crc = -1;  }}
  • CRC64实现了Checksum接口,其update办法会借用LOOKUPTABLE更新crc

小结

exportRDB办法先通过rdb.preamble(RDB_VERSION)写入redis魔数及版本;而后遍历databases,挨个执行rdb.select(i)写入SELECT及db的长度,在执行rdb.dabatase(db),遍历entry,挨个按expiredAt、type、key、value写入数据;end办法写入END_OF_STREAM,而后再写入checksum

doc

  • DBServerState