本文次要钻研一下claudb的SlaveReplication

SlaveReplication

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/SlaveReplication.java

public class SlaveReplication implements RespCallback {  private static final DatabaseKey MASTER_KEY = safeKey("master");  private static final Logger LOGGER = LoggerFactory.getLogger(SlaveReplication.class);  private static final String SYNC_COMMAND = "SYNC";  private final RespClient client;  private final DBServerContext server;  private final DBCommandProcessor processor;  private final String host;  private final int port;  public SlaveReplication(DBServerContext server, Session session, String host, int port) {    this.server = server;    this.host = host;    this.port = port;    this.client = new RespClient(host, port, this);    this.processor = new DBCommandProcessor(server, session);  }  public void start() {    client.start();    server.setMaster(false);    server.getAdminDatabase().put(MASTER_KEY, createState(false));  }  public void stop() {    client.stop();    server.setMaster(true);  }  @Override  public void onConnect() {    LOGGER.info("Connected with master");    client.send(array(string(SYNC_COMMAND)));    server.getAdminDatabase().put(MASTER_KEY, createState(true));  }  @Override  public void onDisconnect() {    LOGGER.info("Disconnected from master");    server.getAdminDatabase().put(MASTER_KEY, createState(false));  }  @Override  public void onMessage(RedisToken token) {    token.accept(RedisTokenVisitor.builder()        .onString(string -> {          processRDB(string);          return null;        })        .onArray(array -> {          processor.processCommand(array);          return null;        }).build());  }  private void processRDB(StringRedisToken token) {    try {      SafeString value = token.getValue();      server.importRDB(toStream(value));      LOGGER.info("loaded RDB file from master");    } catch (IOException e) {      LOGGER.error("error importing RDB file", e);    }  }  private InputStream toStream(SafeString value) {    return new ByteBufferInputStream(value.getBytes());  }  private DatabaseValue createState(boolean connected) {    return hash(entry(safeString("host"), safeString(host)),                entry(safeString("port"), safeString(valueOf(port))),                entry(safeString("state"), safeString(connected ? "connected" : "disconnected")));  }}
  • SlaveReplication实现了RespCallback接口,其结构器创立RespClient、DBCommandProcessor;其start办法执行client.start(),标记server为slave及master connect为false;其stop办法执行client.stop()及标记server为master;其onConnect办法执行client.send(array(string(SYNC_COMMAND)))并标记master connect为true;其onDisconnect标记master connect为false;其onMessage办法在onString时执行processRDB,在onArray时执行processor.processCommand(array);processRDB办法执行server.importRDB(toStream(value))

DBCommandProcessor

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/DBCommandProcessor.java

public class DBCommandProcessor {  private static final Logger LOGGER = LoggerFactory.getLogger(DBCommandProcessor.class);  private final DBServerContext server;  private final Session session;  public DBCommandProcessor(DBServerContext server) {    this(server, new DefaultSession("dummy", null));  }  public DBCommandProcessor(DBServerContext server, Session session) {    this.server = server;    this.session = session;  }  public void processCommand(ArrayRedisToken token) {    Sequence<RedisToken> array = token.getValue();    StringRedisToken commandToken = (StringRedisToken) array.stream().findFirst().orElse(nullString());    List<RedisToken> paramTokens = array.stream().skip(1).collect(toList());    LOGGER.debug("new command recieved: {}", commandToken);    RespCommand command = server.getCommand(commandToken.getValue().toString());    if (command != null) {      command.execute(request(commandToken, paramTokens));    }  }  private Request request(StringRedisToken commandToken, List<RedisToken> array) {    return new DefaultRequest(server, session, commandToken.getValue(), arrayToList(array));  }  private ImmutableArray<SafeString> arrayToList(List<RedisToken> request) {    RedisTokenVisitor<SafeString> visitor = RedisTokenVisitor.<SafeString>builder()        .onString(StringRedisToken::getValue).build();    return ImmutableArray.from(visit(request.stream(), visitor));  }}
  • DBCommandProcessor的processCommand办法解析commandToken及paramTokens,而后通过server.getCommand(commandToken.getValue().toString())找到对应的RespCommand,而后执行command.execute(request(commandToken, paramTokens))

小结

SlaveReplication实现了RespCallback接口,其结构器创立RespClient、DBCommandProcessor;其start办法执行client.start(),标记server为slave及master connect为false;其stop办法执行client.stop()及标记server为master;其onConnect办法执行client.send(array(string(SYNC_COMMAND)))并标记master connect为true;其onDisconnect标记master connect为false;其onMessage办法在onString时执行processRDB,在onArray时执行processor.processCommand(array);processRDB办法执行server.importRDB(toStream(value))

doc

  • SlaveReplication