本文次要钻研一下claudb的NotificationManager

NotificationManager

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/event/NotificationManager.java

public class NotificationManager implements PatternSubscriptionSupport {  private final DBServerContext server;  private final ExecutorService executor = Executors.newSingleThreadExecutor();  public NotificationManager(DBServerContext server) {    this.server = server;  }  public void start() {    // nothing to do  }  public void stop() {    executor.shutdown();  }  public void enqueue(Event event) {    executor.execute(() -> patternPublish(server, event.getChannel(), event.getValue()));  }}
  • NotificationManager实现了PatternSubscriptionSupport接口,其结构器要求输出DBServerContext,它提供了start、stop、enqueue办法;其stop办法执行executor.shutdown();其enqueue办法应用executor异步执行patternPublish(server, event.getChannel(), event.getValue())

patternPublish

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternSubscriptionSupport.java

public interface PatternSubscriptionSupport extends BaseSubscriptionSupport {  String PSUBSCRIPTION_PREFIX = "psubscription:";  String PMESSAGE = "pmessage";  //......  default int patternPublish(DBServerContext server, String channel, SafeString message) {    int count = 0;    for (Tuple2<String, ImmutableSet<SafeString>> entry : getPatternSubscriptions(server.getAdminDatabase(), channel)) {      count += publish(server, entry.get2(), toPatternMessage(entry.get1(), channel, message));    }    return count;  }  default ImmutableSet<Tuple2<String, ImmutableSet<SafeString>>> getPatternSubscriptions(Database admin, String channel) {    return getPatternSubscriptions(admin).entries().filter(subscriptionApplyTo(channel));  }  //......}
  • patternPublish办法遍历getPatternSubscriptions(server.getAdminDatabase(), channel),挨个执行publish(server, entry.get2(), toPatternMessage(entry.get1(), channel, message)

BaseSubscriptionSupport

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/BaseSubscriptionSupport.java

public interface BaseSubscriptionSupport{  default void addSubscription(String suffix, Database admin, String sessionId, SafeString channel) {    admin.merge(safeKey(suffix + channel), set(safeString(sessionId)),        (oldValue, newValue) -> set(oldValue.getSet().appendAll(newValue.getSet())));  }  default void removeSubscription(String suffix, Database admin, String sessionId, SafeString channel) {      admin.merge(safeKey(suffix + channel), set(safeString(sessionId)),        (oldValue, newValue) -> set(oldValue.getSet().removeAll(newValue.getSet())));  }  default int publish(DBServerContext server, ImmutableSet<SafeString> clients, RedisToken message) {    clients.forEach(client -> server.publish(client.toString(), message));    return clients.size();  }}
  • BaseSubscriptionSupport提供了publish办法,它遍历clients,挨个执行server.publish(client.toString(), message)

publish

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.java

public class ClauDB extends RespServerContext implements DBServerContext {  private static final String STATE = "state";  private static final Logger LOGGER = LoggerFactory.getLogger(ClauDB.class);  private DatabaseCleaner cleaner;  private Option<PersistenceManager> persistence;  private Option<NotificationManager> notifications;  private final DBConfig config;  public ClauDB() {    this(DEFAULT_HOST, DEFAULT_PORT);  }  public ClauDB(String host, int port) {    this(host, port, DBConfig.builder().build());  }  public ClauDB(String host, int port, DBConfig config) {    super(host, port, new DBCommandSuite(), new DBSessionListener());    this.config = config;  }  //......  @Override  public void publish(String sourceKey, RedisToken message) {    Session session = getSession(sourceKey);    if (session != null) {      session.publish(message);    }  }  //......}
  • ClauDB的publish办法执行的是session.publish(message)

小结

NotificationManager实现了PatternSubscriptionSupport接口,其结构器要求输出DBServerContext,它提供了start、stop、enqueue办法;其stop办法执行executor.shutdown();其enqueue办法应用executor异步执行patternPublish(server, event.getChannel(), event.getValue())

doc

  • NotificationManager