序
本文次要钻研一下claudb的SlaveReplication
SlaveReplication
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/SlaveReplication.java
public class SlaveReplication implements RespCallback { private static final DatabaseKey MASTER_KEY = safeKey("master"); private static final Logger LOGGER = LoggerFactory.getLogger(SlaveReplication.class); private static final String SYNC_COMMAND = "SYNC"; private final RespClient client; private final DBServerContext server; private final DBCommandProcessor processor; private final String host; private final int port; public SlaveReplication(DBServerContext server, Session session, String host, int port) { this.server = server; this.host = host; this.port = port; this.client = new RespClient(host, port, this); this.processor = new DBCommandProcessor(server, session); } public void start() { client.start(); server.setMaster(false); server.getAdminDatabase().put(MASTER_KEY, createState(false)); } public void stop() { client.stop(); server.setMaster(true); } @Override public void onConnect() { LOGGER.info("Connected with master"); client.send(array(string(SYNC_COMMAND))); server.getAdminDatabase().put(MASTER_KEY, createState(true)); } @Override public void onDisconnect() { LOGGER.info("Disconnected from master"); server.getAdminDatabase().put(MASTER_KEY, createState(false)); } @Override public void onMessage(RedisToken token) { token.accept(RedisTokenVisitor.builder() .onString(string -> { processRDB(string); return null; }) .onArray(array -> { processor.processCommand(array); return null; }).build()); } private void processRDB(StringRedisToken token) { try { SafeString value = token.getValue(); server.importRDB(toStream(value)); LOGGER.info("loaded RDB file from master"); } catch (IOException e) { LOGGER.error("error importing RDB file", e); } } private InputStream toStream(SafeString value) { return new ByteBufferInputStream(value.getBytes()); } private DatabaseValue createState(boolean connected) { return hash(entry(safeString("host"), safeString(host)), entry(safeString("port"), safeString(valueOf(port))), entry(safeString("state"), safeString(connected ? "connected" : "disconnected"))); }}
- SlaveReplication实现了RespCallback接口,其结构器创立RespClient、DBCommandProcessor;其start办法执行client.start(),标记server为slave及master connect为false;其stop办法执行client.stop()及标记server为master;其onConnect办法执行client.send(array(string(SYNC_COMMAND)))并标记master connect为true;其onDisconnect标记master connect为false;其onMessage办法在onString时执行processRDB,在onArray时执行processor.processCommand(array);processRDB办法执行server.importRDB(toStream(value))
DBCommandProcessor
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/DBCommandProcessor.java
public class DBCommandProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(DBCommandProcessor.class); private final DBServerContext server; private final Session session; public DBCommandProcessor(DBServerContext server) { this(server, new DefaultSession("dummy", null)); } public DBCommandProcessor(DBServerContext server, Session session) { this.server = server; this.session = session; } public void processCommand(ArrayRedisToken token) { Sequence<RedisToken> array = token.getValue(); StringRedisToken commandToken = (StringRedisToken) array.stream().findFirst().orElse(nullString()); List<RedisToken> paramTokens = array.stream().skip(1).collect(toList()); LOGGER.debug("new command recieved: {}", commandToken); RespCommand command = server.getCommand(commandToken.getValue().toString()); if (command != null) { command.execute(request(commandToken, paramTokens)); } } private Request request(StringRedisToken commandToken, List<RedisToken> array) { return new DefaultRequest(server, session, commandToken.getValue(), arrayToList(array)); } private ImmutableArray<SafeString> arrayToList(List<RedisToken> request) { RedisTokenVisitor<SafeString> visitor = RedisTokenVisitor.<SafeString>builder() .onString(StringRedisToken::getValue).build(); return ImmutableArray.from(visit(request.stream(), visitor)); }}
- DBCommandProcessor的processCommand办法解析commandToken及paramTokens,而后通过server.getCommand(commandToken.getValue().toString())找到对应的RespCommand,而后执行command.execute(request(commandToken, paramTokens))
小结
SlaveReplication实现了RespCallback接口,其结构器创立RespClient、DBCommandProcessor;其start办法执行client.start(),标记server为slave及master connect为false;其stop办法执行client.stop()及标记server为master;其onConnect办法执行client.send(array(string(SYNC_COMMAND)))并标记master connect为true;其onDisconnect标记master connect为false;其onMessage办法在onString时执行processRDB,在onArray时执行processor.processCommand(array);processRDB办法执行server.importRDB(toStream(value))
doc
- SlaveReplication