序
本文次要钻研一下claudb的NotificationManager
NotificationManager
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/event/NotificationManager.java
public class NotificationManager implements PatternSubscriptionSupport { private final DBServerContext server; private final ExecutorService executor = Executors.newSingleThreadExecutor(); public NotificationManager(DBServerContext server) { this.server = server; } public void start() { // nothing to do } public void stop() { executor.shutdown(); } public void enqueue(Event event) { executor.execute(() -> patternPublish(server, event.getChannel(), event.getValue())); }}
- NotificationManager实现了PatternSubscriptionSupport接口,其结构器要求输出DBServerContext,它提供了start、stop、enqueue办法;其stop办法执行executor.shutdown();其enqueue办法应用executor异步执行patternPublish(server, event.getChannel(), event.getValue())
patternPublish
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternSubscriptionSupport.java
public interface PatternSubscriptionSupport extends BaseSubscriptionSupport { String PSUBSCRIPTION_PREFIX = "psubscription:"; String PMESSAGE = "pmessage"; //...... default int patternPublish(DBServerContext server, String channel, SafeString message) { int count = 0; for (Tuple2<String, ImmutableSet<SafeString>> entry : getPatternSubscriptions(server.getAdminDatabase(), channel)) { count += publish(server, entry.get2(), toPatternMessage(entry.get1(), channel, message)); } return count; } default ImmutableSet<Tuple2<String, ImmutableSet<SafeString>>> getPatternSubscriptions(Database admin, String channel) { return getPatternSubscriptions(admin).entries().filter(subscriptionApplyTo(channel)); } //......}
- patternPublish办法遍历getPatternSubscriptions(server.getAdminDatabase(), channel),挨个执行publish(server, entry.get2(), toPatternMessage(entry.get1(), channel, message)
BaseSubscriptionSupport
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/BaseSubscriptionSupport.java
public interface BaseSubscriptionSupport{ default void addSubscription(String suffix, Database admin, String sessionId, SafeString channel) { admin.merge(safeKey(suffix + channel), set(safeString(sessionId)), (oldValue, newValue) -> set(oldValue.getSet().appendAll(newValue.getSet()))); } default void removeSubscription(String suffix, Database admin, String sessionId, SafeString channel) { admin.merge(safeKey(suffix + channel), set(safeString(sessionId)), (oldValue, newValue) -> set(oldValue.getSet().removeAll(newValue.getSet()))); } default int publish(DBServerContext server, ImmutableSet<SafeString> clients, RedisToken message) { clients.forEach(client -> server.publish(client.toString(), message)); return clients.size(); }}
- BaseSubscriptionSupport提供了publish办法,它遍历clients,挨个执行server.publish(client.toString(), message)
publish
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java
public class ClauDB extends RespServerContext implements DBServerContext { private static final String STATE = "state"; private static final Logger LOGGER = LoggerFactory.getLogger(ClauDB.class); private DatabaseCleaner cleaner; private Option<PersistenceManager> persistence; private Option<NotificationManager> notifications; private final DBConfig config; public ClauDB() { this(DEFAULT_HOST, DEFAULT_PORT); } public ClauDB(String host, int port) { this(host, port, DBConfig.builder().build()); } public ClauDB(String host, int port, DBConfig config) { super(host, port, new DBCommandSuite(), new DBSessionListener()); this.config = config; } //...... @Override public void publish(String sourceKey, RedisToken message) { Session session = getSession(sourceKey); if (session != null) { session.publish(message); } } //......}
- ClauDB的publish办法执行的是session.publish(message)
小结
NotificationManager实现了PatternSubscriptionSupport接口,其结构器要求输出DBServerContext,它提供了start、stop、enqueue办法;其stop办法执行executor.shutdown();其enqueue办法应用executor异步执行patternPublish(server, event.getChannel(), event.getValue())
doc
- NotificationManager