序
本文次要钻研一下RespServer
Resp
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/Resp.java
interface Resp { void channel(SocketChannel channel); void connected(ChannelHandlerContext ctx); void disconnected(ChannelHandlerContext ctx); void receive(ChannelHandlerContext ctx, RedisToken message);}
- Resp接口定义了channel、connected、disconnected、receive办法
RespServer
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServer.java
public class RespServer implements Resp { private static final Logger LOGGER = LoggerFactory.getLogger(RespServer.class); private static final int BUFFER_SIZE = 1024 * 1024; private static final int MAX_FRAME_SIZE = BUFFER_SIZE * 100; private static final String DEFAULT_HOST = "localhost"; private static final int DEFAULT_PORT = 12345; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private ChannelFuture future; private final RespServerContext serverContext; public RespServer(RespServerContext serverContext) { this.serverContext = requireNonNull(serverContext); } public static Builder builder() { return new Builder(); } public void start() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new RespInitializerHandler(this)) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.SO_RCVBUF, BUFFER_SIZE) .option(ChannelOption.SO_SNDBUF, BUFFER_SIZE) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); future = bootstrap.bind(serverContext.getHost(), serverContext.getPort()); // Bind and start to accept incoming connections. future.syncUninterruptibly(); serverContext.start(); LOGGER.info("server started: {}:{}", serverContext.getHost(), serverContext.getPort()); } public void stop() { try { if (future != null) { closeFuture(future.channel().close()); } future = null; } finally { workerGroup = closeWorker(workerGroup); bossGroup = closeWorker(bossGroup); } serverContext.stop(); LOGGER.info("server stopped"); } @Override public void channel(SocketChannel channel) { LOGGER.debug("new channel: {}", sourceKey(channel)); channel.pipeline().addLast("redisEncoder", new RedisEncoder()); channel.pipeline().addLast("linDelimiter", new RedisDecoder(MAX_FRAME_SIZE)); channel.pipeline().addLast(new RespConnectionHandler(this)); } @Override public void connected(ChannelHandlerContext ctx) { String sourceKey = sourceKey(ctx.channel()); LOGGER.debug("client connected: {}", sourceKey); getSession(ctx, sourceKey); } @Override public void disconnected(ChannelHandlerContext ctx) { String sourceKey = sourceKey(ctx.channel()); LOGGER.debug("client disconnected: {}", sourceKey); serverContext.removeSession(sourceKey); } @Override public void receive(ChannelHandlerContext ctx, RedisToken message) { String sourceKey = sourceKey(ctx.channel()); LOGGER.debug("message received: {}", sourceKey); parseMessage(message, getSession(ctx, sourceKey)) .ifPresent(serverContext::processCommand); } //......}
- RespServer实现了Resp接口,其start办法创立bossGroup、workerGroup,设置RespInitializerHandler为childHandler,而后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive办法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)
RespInitializerHandler
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespInitializerHandler.java
class RespInitializerHandler extends ChannelInitializer<SocketChannel> { private final Resp impl; RespInitializerHandler(Resp impl) { this.impl = impl; } @Override protected void initChannel(SocketChannel channel) throws Exception { impl.channel(channel); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { impl.disconnected(ctx); }}
- RespInitializerHandler继承ChannelInitializer,其initChannel、channelInactive办法均委托给Resp的实现
RedisEncoder
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisEncoder.java
public class RedisEncoder extends MessageToByteEncoder<RedisToken> { @Override protected void encode(ChannelHandlerContext ctx, RedisToken msg, ByteBuf out) throws Exception { out.writeBytes(new RedisSerializer().encodeToken(msg)); }}
- RedisEncoder继承了MessageToByteEncoder,其encode通过RedisSerializer的encodeToken来编码RedisToken
RedisDecoder
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisDecoder.java
public class RedisDecoder extends ReplayingDecoder<Void> { private final int maxLength; public RedisDecoder(int maxLength) { this.maxLength = maxLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { out.add(parseResponse(buffer)); } private RedisToken parseResponse(ByteBuf buffer) { RedisToken token = createParser(buffer).next(); checkpoint(); return token; } private RedisParser createParser(ByteBuf buffer) { return new RedisParser(maxLength, new NettyRedisSource(this, buffer)); } //......}
- RedisDecoder继承了ReplayingDecoder,其decode通过RedisParser来解析
RespServerContext
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServerContext.java
public class RespServerContext implements ServerContext { private static final Logger LOGGER = LoggerFactory.getLogger(RespServerContext.class); private final StateHolder state = new StateHolder(); private final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>(); private final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor()); private final String host; private final int port; private final CommandSuite commands; private SessionListener sessionListener; public RespServerContext(String host, int port, CommandSuite commands) { this(host, port, commands, nullListener()); } public RespServerContext(String host, int port, CommandSuite commands, SessionListener sessionListener) { this.host = requireNonNull(host); this.port = requireRange(port, 1024, 65535); this.commands = requireNonNull(commands); this.sessionListener = sessionListener; } public void start() { } public void stop() { clear(); scheduler.shutdown(); } @Override public int getClients() { return clients.size(); } @Override public RespCommand getCommand(String name) { return commands.getCommand(name); } @Override public <T> Option<T> getValue(String key) { return state.getValue(key); } @Override public <T> Option<T> removeValue(String key) { return state.removeValue(key); } @Override public void putValue(String key, Object value) { state.putValue(key, value); } @Override public String getHost() { return host; } @Override public int getPort() { return port; } Session getSession(String sourceKey, Function<String, Session> factory) { return clients.computeIfAbsent(sourceKey, key -> { Session session = factory.apply(key); sessionListener.sessionCreated(session); return session; }); } void processCommand(Request request) { LOGGER.debug("received command: {}", request); RespCommand command = getCommand(request.getCommand()); try { executeOn(execute(command, request)) .subscribe(response -> processResponse(request, response), ex -> LOGGER.error("error executing command: " + request, ex)); } catch (RuntimeException ex) { LOGGER.error("error executing command: " + request, ex); } } protected CommandSuite getCommands() { return commands; } protected void removeSession(String sourceKey) { Session session = clients.remove(sourceKey); if (session != null) { sessionListener.sessionDeleted(session); } } protected Session getSession(String key) { return clients.get(key); } protected RedisToken executeCommand(RespCommand command, Request request) { return command.execute(request); } protected <T> Observable<T> executeOn(Observable<T> observable) { return observable.observeOn(scheduler); } private void processResponse(Request request, RedisToken token) { request.getSession().publish(token); if (request.isExit()) { request.getSession().close(); } } private Observable<RedisToken> execute(RespCommand command, Request request) { return Observable.create(observer -> { observer.onNext(executeCommand(command, request)); observer.onComplete(); }); } private int requireRange(int value, int min, int max) { if (value <= min || value > max) { throw new IllegalArgumentException(min + " <= " + value + " < " + max); } return value; } private void clear() { clients.clear(); state.clear(); }}
- RespServerContext实现了ServerContext接口,其结构器要求设置commands参数;其processCommand办法先通过getCommand(request.getCommand())获取RespCommand,之后通过executeCommand来执行,最初返回RedisToken
RedisToken
resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisToken.java
public interface RedisToken { RedisToken NULL_STRING = string((SafeString) null); RedisToken RESPONSE_OK = status("OK"); RedisTokenType getType(); <T> T accept(RedisTokenVisitor<T> visitor); static RedisToken nullString() { return NULL_STRING; } static RedisToken responseOk() { return RESPONSE_OK; } static RedisToken string(SafeString str) { return new StringRedisToken(str); } static RedisToken string(String str) { return new StringRedisToken(safeString(str)); } static RedisToken status(String str) { return new StatusRedisToken(str); } static RedisToken integer(boolean b) { return new IntegerRedisToken(b ? 1 : 0); } static RedisToken integer(int i) { return new IntegerRedisToken(i); } static RedisToken error(String str) { return new ErrorRedisToken(str); } static RedisToken array(RedisToken... redisTokens) { return new ArrayRedisToken(ImmutableList.of(redisTokens)); } static RedisToken array(Collection<RedisToken> redisTokens) { return new ArrayRedisToken(ImmutableList.from(redisTokens)); } static RedisToken array(Sequence<RedisToken> redisTokens) { return new ArrayRedisToken(redisTokens.asArray()); } static <T> Stream<T> visit(Stream<RedisToken> tokens, RedisTokenVisitor<T> visitor) { return tokens.map(token -> token.accept(visitor)); }}
- RedisToken接口定义了getType、accept办法
小结
RespServer实现了Resp接口,其start办法创立bossGroup、workerGroup,设置RespInitializerHandler为childHandler,而后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive办法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)
doc
- Resp