本文次要钻研一下claudb的pubsub command

PublishCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PublishCommand.java

@Command("publish")@ParamLength(2)public class PublishCommand implements DBCommand, SubscriptionSupport, PatternSubscriptionSupport {  @Override  public RedisToken execute(Database db, Request request) {    String channel = request.getParam(0).toString();    SafeString message = request.getParam(1);    return integer(publishAll(getClauDB(request.getServerContext()), channel, message));  }  private int publishAll(DBServerContext server, String channel, SafeString message) {    int count = publish(server, channel, message);    int pcount = patternPublish(server, channel, message);    return count + pcount;  }}
  • PublishCommand实现了DBCommand, SubscriptionSupport, PatternSubscriptionSupport接口,其execute办法执行publishAll(getClauDB(request.getServerContext()), channel, message)

SubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/SubscribeCommand.java

@ReadOnly@Command("subscribe")@ParamLength(1)@PubSubAllowedpublic class SubscribeCommand implements DBCommand, SubscriptionSupport {  private static final String SUBSCRIBE = "subscribe";  @Override  public RedisToken execute(Database db, Request request) {    Database admin = getAdminDatabase(request.getServerContext());    String sessionId = getSessionId(request);    Sequence<SafeString> channels = getChannels(request);    int i = channels.size();    List<Object> result = new LinkedList<>();    for (SafeString channel : request.getParams()) {      addSubscription(admin, sessionId, channel);      getSessionState(request.getSession()).addSubscription(channel);      result.addAll(asList(SUBSCRIBE, channel, ++i));    }    return convert(result);  }  private String getSessionId(Request request) {    return request.getSession().getId();  }  private Sequence<SafeString> getChannels(Request request) {    return getSessionState(request.getSession()).getSubscriptions();  }}
  • SubscribeCommand实现了DBCommand, SubscriptionSupport接口,其execute办法遍历channel挨个执行addSubscription及getSessionState(request.getSession()).addSubscription(channel)

UnsubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/UnsubscribeCommand.java

@ReadOnly@Command("unsubscribe")@ParamLength(1)@PubSubAllowedpublic class UnsubscribeCommand implements DBCommand, SubscriptionSupport {  private static final String UNSUBSCRIBE = "unsubscribe";  @Override  public RedisToken execute(Database db, Request request) {    Database admin = getAdminDatabase(request.getServerContext());    String sessionId = getSessionId(request);    Sequence<SafeString> channels = getChannels(request);    int i = channels.size();    List<Object> result = new LinkedList<>();    for (SafeString channel : request.getParams()) {      removeSubscription(admin, sessionId, channel);      getSessionState(request.getSession()).removeSubscription(channel);      result.addAll(asList(UNSUBSCRIBE, channel, --i));    }    return convert(result);  }  private String getSessionId(Request request) {    return request.getSession().getId();  }  private Sequence<SafeString> getChannels(Request request) {    return getSessionState(request.getSession()).getSubscriptions();  }}
  • UnsubscribeCommand实现了DBCommand, SubscriptionSupport接口,其execute办法遍历channel挨个执行removeSubscription(admin, sessionId, channel)及getSessionState(request.getSession()).removeSubscription(channel)

PatternSubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternSubscribeCommand.java

@ReadOnly@Command("psubscribe")@ParamLength(1)@PubSubAllowedpublic class PatternSubscribeCommand implements DBCommand, PatternSubscriptionSupport {  private static final String PSUBSCRIBE = "psubscribe";  @Override  public RedisToken execute(Database db, Request request) {    Database admin = getAdminDatabase(request.getServerContext());    String sessionId = getSessionId(request);    Sequence<SafeString> channels = getChannels(request);    int i = channels.size();    List<Object> result = new LinkedList<>();    for (SafeString pattern : request.getParams()) {      addPatternSubscription(admin, sessionId, pattern);      getSessionState(request.getSession()).addSubscription(pattern);      result.addAll(asList(PSUBSCRIBE, pattern, ++i));    }    return convert(result);  }  private String getSessionId(Request request) {    return request.getSession().getId();  }  private Sequence<SafeString> getChannels(Request request) {    return getSessionState(request.getSession()).getSubscriptions();  }}
  • PatternSubscribeCommand实现了DBCommand, PatternSubscriptionSupport接口,其execute办法遍历pattern挨个执行addPatternSubscription(admin, sessionId, pattern)及getSessionState(request.getSession()).addSubscription(pattern)

PatternUnsubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternUnsubscribeCommand.java

@ReadOnly@Command("punsubscribe")@ParamLength(1)@PubSubAllowedpublic class PatternUnsubscribeCommand implements DBCommand, PatternSubscriptionSupport {  private static final String PUNSUBSCRIBE = "punsubscribe";  @Override  public RedisToken execute(Database db, Request request) {    Database admin = getAdminDatabase(request.getServerContext());    String sessionId = getSessionId(request);    Sequence<SafeString> channels = getChannels(request);    int i = channels.size();    List<Object> result = new LinkedList<>();    for (SafeString channel : request.getParams()) {      removePatternSubscription(admin, sessionId, channel);      getSessionState(request.getSession()).removeSubscription(channel);      result.addAll(asList(PUNSUBSCRIBE, channel, --i));    }    return convert(result);  }  private String getSessionId(Request request) {    return request.getSession().getId();  }  private Sequence<SafeString> getChannels(Request request) {    return getSessionState(request.getSession()).getSubscriptions();  }}
  • PatternUnsubscribeCommand实现了DBCommand, PatternSubscriptionSupport接口,其execute遍历channel挨个执行removePatternSubscription(admin, sessionId, channel)及getSessionState(request.getSession()).removeSubscription(channel)

小结

claudb pubsub相干的command有PublishCommand、SubscribeCommand、UnsubscribeCommand、PatternSubscribeCommand、PatternUnsubscribeCommand

doc

  • command/pubsub