序
本文次要钻研一下claudb的pubsub command
PublishCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PublishCommand.java
@Command("publish")@ParamLength(2)public class PublishCommand implements DBCommand, SubscriptionSupport, PatternSubscriptionSupport { @Override public RedisToken execute(Database db, Request request) { String channel = request.getParam(0).toString(); SafeString message = request.getParam(1); return integer(publishAll(getClauDB(request.getServerContext()), channel, message)); } private int publishAll(DBServerContext server, String channel, SafeString message) { int count = publish(server, channel, message); int pcount = patternPublish(server, channel, message); return count + pcount; }}
- PublishCommand实现了DBCommand, SubscriptionSupport, PatternSubscriptionSupport接口,其execute办法执行publishAll(getClauDB(request.getServerContext()), channel, message)
SubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/SubscribeCommand.java
@ReadOnly@Command("subscribe")@ParamLength(1)@PubSubAllowedpublic class SubscribeCommand implements DBCommand, SubscriptionSupport { private static final String SUBSCRIBE = "subscribe"; @Override public RedisToken execute(Database db, Request request) { Database admin = getAdminDatabase(request.getServerContext()); String sessionId = getSessionId(request); Sequence<SafeString> channels = getChannels(request); int i = channels.size(); List<Object> result = new LinkedList<>(); for (SafeString channel : request.getParams()) { addSubscription(admin, sessionId, channel); getSessionState(request.getSession()).addSubscription(channel); result.addAll(asList(SUBSCRIBE, channel, ++i)); } return convert(result); } private String getSessionId(Request request) { return request.getSession().getId(); } private Sequence<SafeString> getChannels(Request request) { return getSessionState(request.getSession()).getSubscriptions(); }}
- SubscribeCommand实现了DBCommand, SubscriptionSupport接口,其execute办法遍历channel挨个执行addSubscription及getSessionState(request.getSession()).addSubscription(channel)
UnsubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/UnsubscribeCommand.java
@ReadOnly@Command("unsubscribe")@ParamLength(1)@PubSubAllowedpublic class UnsubscribeCommand implements DBCommand, SubscriptionSupport { private static final String UNSUBSCRIBE = "unsubscribe"; @Override public RedisToken execute(Database db, Request request) { Database admin = getAdminDatabase(request.getServerContext()); String sessionId = getSessionId(request); Sequence<SafeString> channels = getChannels(request); int i = channels.size(); List<Object> result = new LinkedList<>(); for (SafeString channel : request.getParams()) { removeSubscription(admin, sessionId, channel); getSessionState(request.getSession()).removeSubscription(channel); result.addAll(asList(UNSUBSCRIBE, channel, --i)); } return convert(result); } private String getSessionId(Request request) { return request.getSession().getId(); } private Sequence<SafeString> getChannels(Request request) { return getSessionState(request.getSession()).getSubscriptions(); }}
- UnsubscribeCommand实现了DBCommand, SubscriptionSupport接口,其execute办法遍历channel挨个执行removeSubscription(admin, sessionId, channel)及getSessionState(request.getSession()).removeSubscription(channel)
PatternSubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternSubscribeCommand.java
@ReadOnly@Command("psubscribe")@ParamLength(1)@PubSubAllowedpublic class PatternSubscribeCommand implements DBCommand, PatternSubscriptionSupport { private static final String PSUBSCRIBE = "psubscribe"; @Override public RedisToken execute(Database db, Request request) { Database admin = getAdminDatabase(request.getServerContext()); String sessionId = getSessionId(request); Sequence<SafeString> channels = getChannels(request); int i = channels.size(); List<Object> result = new LinkedList<>(); for (SafeString pattern : request.getParams()) { addPatternSubscription(admin, sessionId, pattern); getSessionState(request.getSession()).addSubscription(pattern); result.addAll(asList(PSUBSCRIBE, pattern, ++i)); } return convert(result); } private String getSessionId(Request request) { return request.getSession().getId(); } private Sequence<SafeString> getChannels(Request request) { return getSessionState(request.getSession()).getSubscriptions(); }}
- PatternSubscribeCommand实现了DBCommand, PatternSubscriptionSupport接口,其execute办法遍历pattern挨个执行addPatternSubscription(admin, sessionId, pattern)及getSessionState(request.getSession()).addSubscription(pattern)
PatternUnsubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternUnsubscribeCommand.java
@ReadOnly@Command("punsubscribe")@ParamLength(1)@PubSubAllowedpublic class PatternUnsubscribeCommand implements DBCommand, PatternSubscriptionSupport { private static final String PUNSUBSCRIBE = "punsubscribe"; @Override public RedisToken execute(Database db, Request request) { Database admin = getAdminDatabase(request.getServerContext()); String sessionId = getSessionId(request); Sequence<SafeString> channels = getChannels(request); int i = channels.size(); List<Object> result = new LinkedList<>(); for (SafeString channel : request.getParams()) { removePatternSubscription(admin, sessionId, channel); getSessionState(request.getSession()).removeSubscription(channel); result.addAll(asList(PUNSUBSCRIBE, channel, --i)); } return convert(result); } private String getSessionId(Request request) { return request.getSession().getId(); } private Sequence<SafeString> getChannels(Request request) { return getSessionState(request.getSession()).getSubscriptions(); }}
- PatternUnsubscribeCommand实现了DBCommand, PatternSubscriptionSupport接口,其execute遍历channel挨个执行removePatternSubscription(admin, sessionId, channel)及getSessionState(request.getSession()).removeSubscription(channel)
小结
claudb pubsub相干的command有PublishCommand、SubscribeCommand、UnsubscribeCommand、PatternSubscribeCommand、PatternUnsubscribeCommand
doc
- command/pubsub