本文次要钻研一下zset command

SortedSetAddCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetAddCommand.java

@Command("zadd")@ParamLength(3)@ParamType(DataType.ZSET)public class SortedSetAddCommand implements DBCommand {  @Override  public RedisToken execute(Database db, Request request) {    try {      DatabaseValue initial = db.getOrDefault(safeKey(request.getParam(0)), DatabaseValue.EMPTY_ZSET);      DatabaseValue result = db.merge(safeKey(request.getParam(0)), parseInput(request),          (oldValue, newValue) -> {            Set<Entry<Double, SafeString>> merge = new SortedSet();            merge.addAll(oldValue.getSortedSet());            merge.addAll(newValue.getSortedSet());            return zset(merge);          });      return integer(changed(initial.getSortedSet(), result.getSortedSet()));    } catch (NumberFormatException e) {      return error("ERR value is not a valid float");    }  }  private int changed(Set<Entry<Double, SafeString>> input, Set<Entry<Double, SafeString>> result) {    return result.size() - input.size();  }  private DatabaseValue parseInput(Request request) {    Set<Entry<Double, SafeString>> set = new SortedSet();    SafeString score = null;    for (SafeString string : request.getParams().stream().skip(1).collect(toList())) {      if (score != null) {        set.add(score(parseFloat(score.toString()), string));        score =  null;      } else {        score = string;      }    }    return zset(set);  }}
  • SortedSetAddCommand实现了DBCommand接口,其execute办法先获取initial,而后执行db.merge办法,它先增加oldValue.getSortedSet()、再增加newValue.getSortedSet()

SortedSetCardinalityCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetCardinalityCommand.java

@ReadOnly@Command("zcard")@ParamLength(1)@ParamType(DataType.ZSET)public class SortedSetCardinalityCommand implements DBCommand {  @Override  public RedisToken execute(Database db, Request request) {    DatabaseValue value = db.getOrDefault(safeKey(request.getParam(0)), DatabaseValue.EMPTY_ZSET);    Set<Entry<Double, SafeString>> set = value.getSortedSet();    return integer(set.size());  }}
  • SortedSetCardinalityCommand实现了DBCommand接口,其execute办法先通过db.getOrDefault获取value,在获取value.getSortedSet()

SortedSetRemoveCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetRemoveCommand.java

@Command("zrem")@ParamLength(2)@ParamType(DataType.ZSET)public class SortedSetRemoveCommand implements DBCommand {  @Override  public RedisToken execute(Database db, Request request) {    List<SafeString> items =  request.getParams().stream().skip(1).collect(toList());    List<SafeString> removed = new LinkedList<>();    db.merge(safeKey(request.getParam(0)), DatabaseValue.EMPTY_ZSET,             (oldValue, newValue) -> {               Set<Entry<Double, SafeString>> merge = new SortedSet();               merge.addAll(oldValue.getSortedSet());               for (SafeString item : items) {                 if (merge.remove(score(0, item))) {                   removed.add(item);                 }               }               return zset(merge);             });    return integer(removed.size());  }}
  • SortedSetRemoveCommand实现了DBCommand接口,其execute办法先从request参数提取items,而后执行db.merge,该办法遍历items挨个执行merge.remove(score(0, item))

SortedSetRangeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetRangeCommand.java

@ReadOnly@Command("zrange")@ParamLength(3)@ParamType(DataType.ZSET)public class SortedSetRangeCommand implements DBCommand {  private static final String PARAM_WITHSCORES = "WITHSCORES";  @Override  public RedisToken execute(Database db, Request request) {    try {      DatabaseValue value = db.getOrDefault(safeKey(request.getParam(0)), DatabaseValue.EMPTY_ZSET);      NavigableSet<Entry<Double, SafeString>> set = value.getSortedSet();      int from = Integer.parseInt(request.getParam(1).toString());      if (from < 0) {        from = set.size() + from;      }      int to = Integer.parseInt(request.getParam(2).toString());      if (to < 0) {        to = set.size() + to;      }      List<Object> result = emptyList();      if (from <= to) {        Option<SafeString> withScores = request.getOptionalParam(3);        if (withScores.isPresent() && withScores.get().toString().equalsIgnoreCase(PARAM_WITHSCORES)) {          result = set.stream().skip(from).limit((to - from) + 1l)              .flatMap(entry -> Stream.of(entry.getValue(), entry.getKey())).collect(toList());        } else {          result = set.stream().skip(from).limit((to - from) + 1l)              .map(Entry::getValue).collect(toList());        }      }      return convert(result);    } catch (NumberFormatException e) {      return error("ERR value is not an integer or out of range");    }  }}
  • SortedSetRangeCommand实现了DBCommand接口,其execute办法执行db.getOrDefault获取value,而后通过set.stream().skip(from).limit((to - from) + 1l)获取result

SortedSetRangeByScoreCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetRangeByScoreCommand.java

@ReadOnly@Command("zrangebyscore")@ParamLength(3)@ParamType(DataType.ZSET)public class SortedSetRangeByScoreCommand implements DBCommand {  private static final String EXCLUSIVE = "(";  private static final String MINUS_INFINITY = "-inf";  private static final String INIFITY = "+inf";  private static final String PARAM_WITHSCORES = "WITHSCORES";  private static final String PARAM_LIMIT = "LIMIT";  @Override  public RedisToken execute(Database db, Request request) {    try {      DatabaseValue value = db.getOrDefault(safeKey(request.getParam(0)), DatabaseValue.EMPTY_ZSET);      NavigableSet<Entry<Double, SafeString>> set = value.getSortedSet();      float from = parseRange(request.getParam(1).toString());      float to = parseRange(request.getParam(2).toString());      Options options = parseOptions(request);      Set<Entry<Double, SafeString>> range = set.subSet(          score(from, SafeString.EMPTY_STRING), inclusive(request.getParam(1)),          score(to, SafeString.EMPTY_STRING), inclusive(request.getParam(2)));      List<Object> result = emptyList();      if (from <= to) {        if (options.withScores) {          result = range.stream().flatMap(              entry -> Stream.of(entry.getValue(), entry.getKey())).collect(toList());        } else {          result = range.stream().map(Entry::getValue).collect(toList());        }        if (options.withLimit) {          result = result.stream().skip(options.offset).limit(options.count).collect(toList());        }      }      return convert(result);    } catch (NumberFormatException e) {      return error("ERR value is not an float or out of range");    }  }  private Options parseOptions(Request request) {    Options options = new Options();    for (int i = 3; i < request.getLength(); i++) {      String param = request.getParam(i).toString();      if (param.equalsIgnoreCase(PARAM_LIMIT)) {        options.withLimit = true;        options.offset = parseInt(request.getParam(++i).toString());        options.count = parseInt(request.getParam(++i).toString());      } else if (param.equalsIgnoreCase(PARAM_WITHSCORES)) {        options.withScores = true;      }    }    return options;  }  private boolean inclusive(SafeString param) {    return !param.toString().startsWith(EXCLUSIVE);  }  private float parseRange(String param) {    switch (param) {    case INIFITY:      return Float.MAX_VALUE;    case MINUS_INFINITY:      return Float.MIN_VALUE;    default:      if (param.startsWith(EXCLUSIVE)) {        return Float.parseFloat(param.substring(1));      }      return Float.parseFloat(param);    }  }  private static class Options {    private boolean withScores;    private boolean withLimit;    private int offset;    private int count;  }}
  • SortedSetRangeByScoreCommand实现了DBCommand接口,其execute办法执行db.getOrDefault获取value,而后通过set.subSet(score(from, SafeString.EMPTY_STRING), inclusive(request.getParam(1)),score(to, SafeString.EMPTY_STRING), inclusive(request.getParam(2)))获取range,最初从range提取result

SortedSetReverseRangeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetReverseRangeCommand.java

@ReadOnly@Command("zrevrange")@ParamLength(3)@ParamType(DataType.ZSET)public class SortedSetReverseRangeCommand implements DBCommand {  private static final String PARAM_WITHSCORES = "WITHSCORES";  @Override  public RedisToken execute(Database db, Request request) {    try {      DatabaseValue value = db.getOrDefault(safeKey(request.getParam(0)), DatabaseValue.EMPTY_ZSET);      NavigableSet<Entry<Double, SafeString>> set = value.getSortedSet();      int from = Integer.parseInt(request.getParam(2).toString());      if (from < 0) {        from = set.size() + from;      }      int to = Integer.parseInt(request.getParam(1).toString());      if (to < 0) {        to = set.size() + to;      }      List<Object> result = emptyList();      if (from <= to) {        Option<SafeString> withScores = request.getOptionalParam(3);        if (withScores.isPresent() && withScores.get().toString().equalsIgnoreCase(PARAM_WITHSCORES)) {          result = set.stream().skip(from).limit((to - from) + 1l)              .flatMap(item -> Stream.of(item.getValue(), item.getKey())).collect(toList());        } else {          result = set.stream().skip(from).limit((to - from) + 1l)              .map(Entry::getValue).collect(toList());        }      }      reverse(result);      return convert(result);    } catch (NumberFormatException e) {      return error("ERR value is not an integer or out of range");    }  }}
  • SortedSetReverseRangeCommand实现了DBCommand接口,其execute办法执行db.getOrDefault获取value,而后通过set.stream().skip(from).limit((to - from) + 1l)获取result

SortedSetIncrementByCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/zset/SortedSetIncrementByCommand.java

@Command("zincrby")@ParamLength(3)@ParamType(DataType.ZSET)public class SortedSetIncrementByCommand implements DBCommand {  @Override  public RedisToken execute(Database db, Request request) {    try {      DatabaseKey zkey = safeKey(request.getParam(0));      DatabaseValue value = db.getOrDefault(zkey, DatabaseValue.EMPTY_ZSET);      NavigableSet<Entry<Double, SafeString>> set = value.getSortedSet();      SafeString key = request.getParam(2);      Double increment = Double.parseDouble(request.getParam(1).toString());      Entry<Double, SafeString> newValue = merge(set, key, increment);      SortedSet result = new SortedSet();      result.addAll(set);      result.remove(newValue);      result.add(newValue);      db.put(zkey, zset(result));      return string(newValue.getKey().toString());    } catch (NumberFormatException e) {      return error("ERR value is not an integer or out of range");    }  }  private Entry<Double, SafeString>          merge(NavigableSet<Entry<Double, SafeString>> set, SafeString key, Double increment) {    return score(findByKey(set, key).getKey() + increment, key);  }  private Entry<Double, SafeString> findByKey(NavigableSet<Entry<Double, SafeString>> set, SafeString key) {    // TODO: O(n) search, to fix forget the NavigableSet and use directly the SortedSet to get by key    return set.stream().filter(entry -> entry.getValue().equals(key)).findFirst().orElse(score(0, key));  }}
  • SortedSetIncrementByCommand实现了DBCommand接口,其execute办法执行db.getOrDefault获取value,而后通过merge获取newValue

小结

claudb sorted set相干的command有SortedSetAddCommand、SortedSetCardinalityCommand、SortedSetRemoveCommand、SortedSetRangeCommand、SortedSetRangeByScoreCommand、SortedSetReverseRangeCommand、SortedSetIncrementByCommand

doc

  • command/zset