序
本文次要钻研一下 claudb 的 SlaveReplication
SlaveReplication
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/SlaveReplication.java
public class SlaveReplication implements RespCallback {private static final DatabaseKey MASTER_KEY = safeKey("master");
private static final Logger LOGGER = LoggerFactory.getLogger(SlaveReplication.class);
private static final String SYNC_COMMAND = "SYNC";
private final RespClient client;
private final DBServerContext server;
private final DBCommandProcessor processor;
private final String host;
private final int port;
public SlaveReplication(DBServerContext server, Session session, String host, int port) {
this.server = server;
this.host = host;
this.port = port;
this.client = new RespClient(host, port, this);
this.processor = new DBCommandProcessor(server, session);
}
public void start() {client.start();
server.setMaster(false);
server.getAdminDatabase().put(MASTER_KEY, createState(false));
}
public void stop() {client.stop();
server.setMaster(true);
}
@Override
public void onConnect() {LOGGER.info("Connected with master");
client.send(array(string(SYNC_COMMAND)));
server.getAdminDatabase().put(MASTER_KEY, createState(true));
}
@Override
public void onDisconnect() {LOGGER.info("Disconnected from master");
server.getAdminDatabase().put(MASTER_KEY, createState(false));
}
@Override
public void onMessage(RedisToken token) {token.accept(RedisTokenVisitor.builder()
.onString(string -> {processRDB(string);
return null;
})
.onArray(array -> {processor.processCommand(array);
return null;
}).build());
}
private void processRDB(StringRedisToken token) {
try {SafeString value = token.getValue();
server.importRDB(toStream(value));
LOGGER.info("loaded RDB file from master");
} catch (IOException e) {LOGGER.error("error importing RDB file", e);
}
}
private InputStream toStream(SafeString value) {return new ByteBufferInputStream(value.getBytes());
}
private DatabaseValue createState(boolean connected) {return hash(entry(safeString("host"), safeString(host)),
entry(safeString("port"), safeString(valueOf(port))),
entry(safeString("state"), safeString(connected ? "connected" : "disconnected")));
}
}
- SlaveReplication 实现了 RespCallback 接口,其结构器创立 RespClient、DBCommandProcessor;其 start 办法执行 client.start(),标记 server 为 slave 及 master connect 为 false;其 stop 办法执行 client.stop() 及标记 server 为 master;其 onConnect 办法执行 client.send(array(string(SYNC_COMMAND))) 并标记 master connect 为 true;其 onDisconnect 标记 master connect 为 false;其 onMessage 办法在 onString 时执行 processRDB,在 onArray 时执行 processor.processCommand(array);processRDB 办法执行 server.importRDB(toStream(value))
DBCommandProcessor
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/DBCommandProcessor.java
public class DBCommandProcessor {private static final Logger LOGGER = LoggerFactory.getLogger(DBCommandProcessor.class);
private final DBServerContext server;
private final Session session;
public DBCommandProcessor(DBServerContext server) {this(server, new DefaultSession("dummy", null));
}
public DBCommandProcessor(DBServerContext server, Session session) {
this.server = server;
this.session = session;
}
public void processCommand(ArrayRedisToken token) {Sequence<RedisToken> array = token.getValue();
StringRedisToken commandToken = (StringRedisToken) array.stream().findFirst().orElse(nullString());
List<RedisToken> paramTokens = array.stream().skip(1).collect(toList());
LOGGER.debug("new command recieved: {}", commandToken);
RespCommand command = server.getCommand(commandToken.getValue().toString());
if (command != null) {command.execute(request(commandToken, paramTokens));
}
}
private Request request(StringRedisToken commandToken, List<RedisToken> array) {return new DefaultRequest(server, session, commandToken.getValue(), arrayToList(array));
}
private ImmutableArray<SafeString> arrayToList(List<RedisToken> request) {RedisTokenVisitor<SafeString> visitor = RedisTokenVisitor.<SafeString>builder()
.onString(StringRedisToken::getValue).build();
return ImmutableArray.from(visit(request.stream(), visitor));
}
}
- DBCommandProcessor 的 processCommand 办法解析 commandToken 及 paramTokens,而后通过 server.getCommand(commandToken.getValue().toString()) 找到对应的 RespCommand,而后执行 command.execute(request(commandToken, paramTokens))
小结
SlaveReplication 实现了 RespCallback 接口,其结构器创立 RespClient、DBCommandProcessor;其 start 办法执行 client.start(),标记 server 为 slave 及 master connect 为 false;其 stop 办法执行 client.stop() 及标记 server 为 master;其 onConnect 办法执行 client.send(array(string(SYNC_COMMAND))) 并标记 master connect 为 true;其 onDisconnect 标记 master connect 为 false;其 onMessage 办法在 onString 时执行 processRDB,在 onArray 时执行 processor.processCommand(array);processRDB 办法执行 server.importRDB(toStream(value))
doc
- SlaveReplication