共计 5649 个字符,预计需要花费 15 分钟才能阅读完成。
序
本文次要钻研一下 claudb 的 pubsub command
PublishCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PublishCommand.java
@Command("publish")
@ParamLength(2)
public class PublishCommand implements DBCommand, SubscriptionSupport, PatternSubscriptionSupport {
@Override
public RedisToken execute(Database db, Request request) {String channel = request.getParam(0).toString();
SafeString message = request.getParam(1);
return integer(publishAll(getClauDB(request.getServerContext()), channel, message));
}
private int publishAll(DBServerContext server, String channel, SafeString message) {int count = publish(server, channel, message);
int pcount = patternPublish(server, channel, message);
return count + pcount;
}
}
- PublishCommand 实现了 DBCommand, SubscriptionSupport, PatternSubscriptionSupport 接口,其 execute 办法执行 publishAll(getClauDB(request.getServerContext()), channel, message)
SubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/SubscribeCommand.java
@ReadOnly
@Command("subscribe")
@ParamLength(1)
@PubSubAllowed
public class SubscribeCommand implements DBCommand, SubscriptionSupport {
private static final String SUBSCRIBE = "subscribe";
@Override
public RedisToken execute(Database db, Request request) {Database admin = getAdminDatabase(request.getServerContext());
String sessionId = getSessionId(request);
Sequence<SafeString> channels = getChannels(request);
int i = channels.size();
List<Object> result = new LinkedList<>();
for (SafeString channel : request.getParams()) {addSubscription(admin, sessionId, channel);
getSessionState(request.getSession()).addSubscription(channel);
result.addAll(asList(SUBSCRIBE, channel, ++i));
}
return convert(result);
}
private String getSessionId(Request request) {return request.getSession().getId();}
private Sequence<SafeString> getChannels(Request request) {return getSessionState(request.getSession()).getSubscriptions();}
}
- SubscribeCommand 实现了 DBCommand, SubscriptionSupport 接口,其 execute 办法遍历 channel 挨个执行 addSubscription 及 getSessionState(request.getSession()).addSubscription(channel)
UnsubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/UnsubscribeCommand.java
@ReadOnly
@Command("unsubscribe")
@ParamLength(1)
@PubSubAllowed
public class UnsubscribeCommand implements DBCommand, SubscriptionSupport {
private static final String UNSUBSCRIBE = "unsubscribe";
@Override
public RedisToken execute(Database db, Request request) {Database admin = getAdminDatabase(request.getServerContext());
String sessionId = getSessionId(request);
Sequence<SafeString> channels = getChannels(request);
int i = channels.size();
List<Object> result = new LinkedList<>();
for (SafeString channel : request.getParams()) {removeSubscription(admin, sessionId, channel);
getSessionState(request.getSession()).removeSubscription(channel);
result.addAll(asList(UNSUBSCRIBE, channel, --i));
}
return convert(result);
}
private String getSessionId(Request request) {return request.getSession().getId();}
private Sequence<SafeString> getChannels(Request request) {return getSessionState(request.getSession()).getSubscriptions();}
}
- UnsubscribeCommand 实现了 DBCommand, SubscriptionSupport 接口,其 execute 办法遍历 channel 挨个执行 removeSubscription(admin, sessionId, channel) 及 getSessionState(request.getSession()).removeSubscription(channel)
PatternSubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternSubscribeCommand.java
@ReadOnly
@Command("psubscribe")
@ParamLength(1)
@PubSubAllowed
public class PatternSubscribeCommand implements DBCommand, PatternSubscriptionSupport {
private static final String PSUBSCRIBE = "psubscribe";
@Override
public RedisToken execute(Database db, Request request) {Database admin = getAdminDatabase(request.getServerContext());
String sessionId = getSessionId(request);
Sequence<SafeString> channels = getChannels(request);
int i = channels.size();
List<Object> result = new LinkedList<>();
for (SafeString pattern : request.getParams()) {addPatternSubscription(admin, sessionId, pattern);
getSessionState(request.getSession()).addSubscription(pattern);
result.addAll(asList(PSUBSCRIBE, pattern, ++i));
}
return convert(result);
}
private String getSessionId(Request request) {return request.getSession().getId();}
private Sequence<SafeString> getChannels(Request request) {return getSessionState(request.getSession()).getSubscriptions();}
}
- PatternSubscribeCommand 实现了 DBCommand, PatternSubscriptionSupport 接口,其 execute 办法遍历 pattern 挨个执行 addPatternSubscription(admin, sessionId, pattern) 及 getSessionState(request.getSession()).addSubscription(pattern)
PatternUnsubscribeCommand
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternUnsubscribeCommand.java
@ReadOnly
@Command("punsubscribe")
@ParamLength(1)
@PubSubAllowed
public class PatternUnsubscribeCommand implements DBCommand, PatternSubscriptionSupport {
private static final String PUNSUBSCRIBE = "punsubscribe";
@Override
public RedisToken execute(Database db, Request request) {Database admin = getAdminDatabase(request.getServerContext());
String sessionId = getSessionId(request);
Sequence<SafeString> channels = getChannels(request);
int i = channels.size();
List<Object> result = new LinkedList<>();
for (SafeString channel : request.getParams()) {removePatternSubscription(admin, sessionId, channel);
getSessionState(request.getSession()).removeSubscription(channel);
result.addAll(asList(PUNSUBSCRIBE, channel, --i));
}
return convert(result);
}
private String getSessionId(Request request) {return request.getSession().getId();}
private Sequence<SafeString> getChannels(Request request) {return getSessionState(request.getSession()).getSubscriptions();}
}
- PatternUnsubscribeCommand 实现了 DBCommand, PatternSubscriptionSupport 接口,其 execute 遍历 channel 挨个执行 removePatternSubscription(admin, sessionId, channel) 及 getSessionState(request.getSession()).removeSubscription(channel)
小结
claudb pubsub 相干的 command 有 PublishCommand、SubscribeCommand、UnsubscribeCommand、PatternSubscribeCommand、PatternUnsubscribeCommand
doc
- command/pubsub
正文完