本文次要钻研一下RespServer

Resp

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/Resp.java

interface Resp {  void channel(SocketChannel channel);  void connected(ChannelHandlerContext ctx);  void disconnected(ChannelHandlerContext ctx);  void receive(ChannelHandlerContext ctx, RedisToken message);}
  • Resp接口定义了channel、connected、disconnected、receive办法

RespServer

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServer.java

public class RespServer implements Resp {  private static final Logger LOGGER = LoggerFactory.getLogger(RespServer.class);  private static final int BUFFER_SIZE = 1024 * 1024;  private static final int MAX_FRAME_SIZE = BUFFER_SIZE * 100;  private static final String DEFAULT_HOST = "localhost";  private static final int DEFAULT_PORT = 12345;  private EventLoopGroup bossGroup;  private EventLoopGroup workerGroup;  private ChannelFuture future;  private final RespServerContext serverContext;  public RespServer(RespServerContext serverContext) {    this.serverContext = requireNonNull(serverContext);  }  public static Builder builder() {    return new Builder();  }  public void start() {    bossGroup = new NioEventLoopGroup();    workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);    ServerBootstrap bootstrap = new ServerBootstrap();    bootstrap.group(bossGroup, workerGroup)        .channel(NioServerSocketChannel.class)        .childHandler(new RespInitializerHandler(this))        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)        .option(ChannelOption.SO_RCVBUF, BUFFER_SIZE)        .option(ChannelOption.SO_SNDBUF, BUFFER_SIZE)        .childOption(ChannelOption.SO_KEEPALIVE, true)        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);    future = bootstrap.bind(serverContext.getHost(), serverContext.getPort());    // Bind and start to accept incoming connections.    future.syncUninterruptibly();    serverContext.start();    LOGGER.info("server started: {}:{}", serverContext.getHost(), serverContext.getPort());  }  public void stop() {    try {      if (future != null) {        closeFuture(future.channel().close());      }      future = null;    } finally {      workerGroup = closeWorker(workerGroup);      bossGroup = closeWorker(bossGroup);    }    serverContext.stop();    LOGGER.info("server stopped");  }  @Override  public void channel(SocketChannel channel) {    LOGGER.debug("new channel: {}", sourceKey(channel));    channel.pipeline().addLast("redisEncoder", new RedisEncoder());    channel.pipeline().addLast("linDelimiter", new RedisDecoder(MAX_FRAME_SIZE));    channel.pipeline().addLast(new RespConnectionHandler(this));  }  @Override  public void connected(ChannelHandlerContext ctx) {    String sourceKey = sourceKey(ctx.channel());    LOGGER.debug("client connected: {}", sourceKey);    getSession(ctx, sourceKey);  }  @Override  public void disconnected(ChannelHandlerContext ctx) {    String sourceKey = sourceKey(ctx.channel());    LOGGER.debug("client disconnected: {}", sourceKey);    serverContext.removeSession(sourceKey);  }  @Override  public void receive(ChannelHandlerContext ctx, RedisToken message) {    String sourceKey = sourceKey(ctx.channel());    LOGGER.debug("message received: {}", sourceKey);    parseMessage(message, getSession(ctx, sourceKey))      .ifPresent(serverContext::processCommand);  }  //......}
  • RespServer实现了Resp接口,其start办法创立bossGroup、workerGroup,设置RespInitializerHandler为childHandler,而后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive办法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)

RespInitializerHandler

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespInitializerHandler.java

class RespInitializerHandler extends ChannelInitializer<SocketChannel> {  private final Resp impl;  RespInitializerHandler(Resp impl) {    this.impl = impl;  }  @Override  protected void initChannel(SocketChannel channel) throws Exception {    impl.channel(channel);  }  @Override  public void channelInactive(ChannelHandlerContext ctx) throws Exception {    impl.disconnected(ctx);  }}
  • RespInitializerHandler继承ChannelInitializer,其initChannel、channelInactive办法均委托给Resp的实现

RedisEncoder

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisEncoder.java

public class RedisEncoder extends MessageToByteEncoder<RedisToken> {  @Override  protected void encode(ChannelHandlerContext ctx, RedisToken msg, ByteBuf out) throws Exception {    out.writeBytes(new RedisSerializer().encodeToken(msg));  }}
  • RedisEncoder继承了MessageToByteEncoder,其encode通过RedisSerializer的encodeToken来编码RedisToken

RedisDecoder

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisDecoder.java

public class RedisDecoder extends ReplayingDecoder<Void> {  private final int maxLength;  public RedisDecoder(int maxLength) {    this.maxLength = maxLength;  }  @Override  protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {    out.add(parseResponse(buffer));  }  private RedisToken parseResponse(ByteBuf buffer) {    RedisToken token = createParser(buffer).next();    checkpoint();    return token;  }  private RedisParser createParser(ByteBuf buffer) {    return new RedisParser(maxLength, new NettyRedisSource(this, buffer));  }  //......}
  • RedisDecoder继承了ReplayingDecoder,其decode通过RedisParser来解析

RespServerContext

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServerContext.java

public class RespServerContext implements ServerContext {  private static final Logger LOGGER = LoggerFactory.getLogger(RespServerContext.class);  private final StateHolder state = new StateHolder();  private final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>();  private final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());  private final String host;  private final int port;  private final CommandSuite commands;  private SessionListener sessionListener;  public RespServerContext(String host, int port, CommandSuite commands) {    this(host, port, commands, nullListener());  }  public RespServerContext(String host, int port, CommandSuite commands,                           SessionListener sessionListener) {    this.host = requireNonNull(host);    this.port = requireRange(port, 1024, 65535);    this.commands = requireNonNull(commands);    this.sessionListener = sessionListener;  }  public void start() {  }  public void stop() {    clear();    scheduler.shutdown();  }  @Override  public int getClients() {    return clients.size();  }  @Override  public RespCommand getCommand(String name) {    return commands.getCommand(name);  }  @Override  public <T> Option<T> getValue(String key) {    return state.getValue(key);  }  @Override  public <T> Option<T> removeValue(String key) {    return state.removeValue(key);  }  @Override  public void putValue(String key, Object value) {    state.putValue(key, value);  }  @Override  public String getHost() {    return host;  }  @Override  public int getPort() {    return port;  }  Session getSession(String sourceKey, Function<String, Session> factory) {    return clients.computeIfAbsent(sourceKey, key -> {      Session session = factory.apply(key);      sessionListener.sessionCreated(session);      return session;    });  }  void processCommand(Request request) {    LOGGER.debug("received command: {}", request);    RespCommand command = getCommand(request.getCommand());    try {      executeOn(execute(command, request))        .subscribe(response -> processResponse(request, response),                   ex -> LOGGER.error("error executing command: " + request, ex));    } catch (RuntimeException ex) {      LOGGER.error("error executing command: " + request, ex);    }  }  protected CommandSuite getCommands() {    return commands;  }  protected void removeSession(String sourceKey) {    Session session = clients.remove(sourceKey);    if (session != null) {      sessionListener.sessionDeleted(session);    }  }  protected Session getSession(String key) {    return clients.get(key);  }  protected RedisToken executeCommand(RespCommand command, Request request) {    return command.execute(request);  }  protected <T> Observable<T> executeOn(Observable<T> observable) {    return observable.observeOn(scheduler);  }  private void processResponse(Request request, RedisToken token) {    request.getSession().publish(token);    if (request.isExit()) {      request.getSession().close();    }  }  private Observable<RedisToken> execute(RespCommand command, Request request) {    return Observable.create(observer -> {      observer.onNext(executeCommand(command, request));      observer.onComplete();    });  }  private int requireRange(int value, int min, int max) {    if (value <= min || value > max) {      throw new IllegalArgumentException(min + " <= " + value + " < " + max);    }    return value;  }  private void clear() {    clients.clear();    state.clear();  }}
  • RespServerContext实现了ServerContext接口,其结构器要求设置commands参数;其processCommand办法先通过getCommand(request.getCommand())获取RespCommand,之后通过executeCommand来执行,最初返回RedisToken

RedisToken

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisToken.java

public interface RedisToken {  RedisToken NULL_STRING = string((SafeString) null);  RedisToken RESPONSE_OK = status("OK");  RedisTokenType getType();  <T> T accept(RedisTokenVisitor<T> visitor);  static RedisToken nullString() {    return NULL_STRING;  }  static RedisToken responseOk() {    return RESPONSE_OK;  }  static RedisToken string(SafeString str) {    return new StringRedisToken(str);  }  static RedisToken string(String str) {    return new StringRedisToken(safeString(str));  }  static RedisToken status(String str) {    return new StatusRedisToken(str);  }  static RedisToken integer(boolean b) {    return new IntegerRedisToken(b ? 1 : 0);  }  static RedisToken integer(int i) {    return new IntegerRedisToken(i);  }  static RedisToken error(String str) {    return new ErrorRedisToken(str);  }  static RedisToken array(RedisToken... redisTokens) {    return new ArrayRedisToken(ImmutableList.of(redisTokens));  }  static RedisToken array(Collection<RedisToken> redisTokens) {    return new ArrayRedisToken(ImmutableList.from(redisTokens));  }  static RedisToken array(Sequence<RedisToken> redisTokens) {    return new ArrayRedisToken(redisTokens.asArray());  }  static <T> Stream<T> visit(Stream<RedisToken> tokens, RedisTokenVisitor<T> visitor) {    return tokens.map(token -> token.accept(visitor));  }}
  • RedisToken接口定义了getType、accept办法

小结

RespServer实现了Resp接口,其start办法创立bossGroup、workerGroup,设置RespInitializerHandler为childHandler,而后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive办法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)

doc

  • Resp