乐趣区

关于redis:聊聊claudb的SlaveReplication

本文次要钻研一下 claudb 的 SlaveReplication

SlaveReplication

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/SlaveReplication.java

public class SlaveReplication implements RespCallback {private static final DatabaseKey MASTER_KEY = safeKey("master");

  private static final Logger LOGGER = LoggerFactory.getLogger(SlaveReplication.class);

  private static final String SYNC_COMMAND = "SYNC";

  private final RespClient client;
  private final DBServerContext server;
  private final DBCommandProcessor processor;
  private final String host;
  private final int port;

  public SlaveReplication(DBServerContext server, Session session, String host, int port) {
    this.server = server;
    this.host = host;
    this.port = port;
    this.client = new RespClient(host, port, this);
    this.processor = new DBCommandProcessor(server, session);
  }

  public void start() {client.start();
    server.setMaster(false);
    server.getAdminDatabase().put(MASTER_KEY, createState(false));
  }

  public void stop() {client.stop();
    server.setMaster(true);
  }

  @Override
  public void onConnect() {LOGGER.info("Connected with master");
    client.send(array(string(SYNC_COMMAND)));
    server.getAdminDatabase().put(MASTER_KEY, createState(true));
  }

  @Override
  public void onDisconnect() {LOGGER.info("Disconnected from master");
    server.getAdminDatabase().put(MASTER_KEY, createState(false));
  }

  @Override
  public void onMessage(RedisToken token) {token.accept(RedisTokenVisitor.builder()
        .onString(string -> {processRDB(string);
          return null;
        })
        .onArray(array -> {processor.processCommand(array);
          return null;
        }).build());
  }

  private void processRDB(StringRedisToken token) {
    try {SafeString value = token.getValue();
      server.importRDB(toStream(value));
      LOGGER.info("loaded RDB file from master");
    } catch (IOException e) {LOGGER.error("error importing RDB file", e);
    }
  }

  private InputStream toStream(SafeString value) {return new ByteBufferInputStream(value.getBytes());
  }

  private DatabaseValue createState(boolean connected) {return hash(entry(safeString("host"), safeString(host)),
                entry(safeString("port"), safeString(valueOf(port))),
                entry(safeString("state"), safeString(connected ? "connected" : "disconnected")));
  }
}
  • SlaveReplication 实现了 RespCallback 接口,其结构器创立 RespClient、DBCommandProcessor;其 start 办法执行 client.start(),标记 server 为 slave 及 master connect 为 false;其 stop 办法执行 client.stop() 及标记 server 为 master;其 onConnect 办法执行 client.send(array(string(SYNC_COMMAND))) 并标记 master connect 为 true;其 onDisconnect 标记 master connect 为 false;其 onMessage 办法在 onString 时执行 processRDB,在 onArray 时执行 processor.processCommand(array);processRDB 办法执行 server.importRDB(toStream(value))

DBCommandProcessor

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/DBCommandProcessor.java

public class DBCommandProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(DBCommandProcessor.class);

  private final DBServerContext server;
  private final Session session;

  public DBCommandProcessor(DBServerContext server) {this(server, new DefaultSession("dummy", null));
  }

  public DBCommandProcessor(DBServerContext server, Session session) {
    this.server = server;
    this.session = session;
  }

  public void processCommand(ArrayRedisToken token) {Sequence<RedisToken> array = token.getValue();
    StringRedisToken commandToken = (StringRedisToken) array.stream().findFirst().orElse(nullString());
    List<RedisToken> paramTokens = array.stream().skip(1).collect(toList());

    LOGGER.debug("new command recieved: {}", commandToken);

    RespCommand command = server.getCommand(commandToken.getValue().toString());

    if (command != null) {command.execute(request(commandToken, paramTokens));
    }
  }

  private Request request(StringRedisToken commandToken, List<RedisToken> array) {return new DefaultRequest(server, session, commandToken.getValue(), arrayToList(array));
  }

  private ImmutableArray<SafeString> arrayToList(List<RedisToken> request) {RedisTokenVisitor<SafeString> visitor = RedisTokenVisitor.<SafeString>builder()
        .onString(StringRedisToken::getValue).build();
    return ImmutableArray.from(visit(request.stream(), visitor));
  }
}
  • DBCommandProcessor 的 processCommand 办法解析 commandToken 及 paramTokens,而后通过 server.getCommand(commandToken.getValue().toString()) 找到对应的 RespCommand,而后执行 command.execute(request(commandToken, paramTokens))

小结

SlaveReplication 实现了 RespCallback 接口,其结构器创立 RespClient、DBCommandProcessor;其 start 办法执行 client.start(),标记 server 为 slave 及 master connect 为 false;其 stop 办法执行 client.stop() 及标记 server 为 master;其 onConnect 办法执行 client.send(array(string(SYNC_COMMAND))) 并标记 master connect 为 true;其 onDisconnect 标记 master connect 为 false;其 onMessage 办法在 onString 时执行 processRDB,在 onArray 时执行 processor.processCommand(array);processRDB 办法执行 server.importRDB(toStream(value))

doc

  • SlaveReplication
退出移动版