聊聊CRDT

30次阅读

共计 14585 个字符,预计需要花费 37 分钟才能阅读完成。

本文主要研究一下 CRDT

CRDT

CRDT 是 Conflict-free Replicated Data Type 的简称,也称为 a passive synchronisation,即免冲突的可复制的数据类型,这种数据类型可以用于数据跨网络复制并且可以自动解决冲突达到一致,非常适合使用 AP 架构的系统在各个 partition 之间复制数据时使用;具体实现上可以分为 State-based 的 CvRDT、Operation-based 的 CmRDT、Delta-based、Pure operation-based 等

Consistency without Consensus,guarantee convergence to the same value in spite of network delays, partitions and message reordering

State-based(CvRDT)

  • CvRDT 即 Convergent Replicated Data Type 的简称,也称为 an active synchronisation,通常在诸如 NFS, AFS, Coda 的文件系统以及诸如 Riak, Dynamo 的 KV 存储中使用
  • 这种方式是通过传递整个 object 的 states 来完成,需要定义一个 merge 函数来合并输入的 object states
  • 该 merge 函数需要满足 commutative 及 idempotent,即 monotonically increasing,以做到可以重试及与 order 无关

Operation-based(CmRDT)

  • CmRDT 即 Commutative Replicated Data Type 的简称,通常在诸如 Bayou, Rover, IceCube, Telex 的 cooperative systems 中使用
  • 这种方式是通过传递 operations 来完成,需要 prepare 方法生成 operations,以及 effect 方法将输入的 operations 表示的变更作用在 local state 中
  • 这里要求传输协议是可靠的,如果可能重复传输则要求 effect 是幂等的,而且对 order 有一定要求,如果不能保证 order 则需要 effect 叠加在一起是 or 的效果

Delta-based

Delta-based 可以理解为是结合 State-based 及 Operation-based 的一种改进,它通过 delta-mutators 来进行 replicate

Pure operation-based

通常 Operation-based 的方式需要 prepare 方法生成 operations,这里可能存在延时,Pure operation-based 是指 prepare 的实现不是通过对比 state 生成 operations,而是仅仅返回现成的 operations,这就需要记录每一步对 object state 操作的 operations

Convergent Operations

对于 CRDT 来说,为了实现 Conflict-free Replicated 对数据结构的一些操作需要满足如下条件:

  • Associative

(a+(b+c)=(a+b)+c),即 grouping 没有影响

  • Commutative

(a+b=b+a),即 order 没有影响

  • Idempotent

(a+a=a),即 duplication 没有影响 ( 幂等)

基本数据类型

CRDT 的基本数据类型有 Counters、Registers、Sets

Counters

  • Grow-only counter(G-Counter)

使用 max 函数来进行 merge

  • Positive-negative counter(PN-Counter)

使用两个 G -Counter 来实现,一个用于递增,一个用于递减,最后取值进行 sum

Registers

register 有 assign()及 value()两种操作

  • Last Write Wins -register(LWW-Register)

给每个 assign 操作添加 unique ids,比如 timestamps 或者 vector clock,使用 max 函数进行 merge

  • Multi-valued -register(MV-Register)

类似 G -Counter,每次 assign 都会新增一个版本,使用 max 函数进行 merge

Sets

  • Grow-only set(G-Set)

使用 union 操作进行 merge

  • Two-phase set(2P-Set)

使用两个 G -Set 来实现,一个 addSet 用于添加,一个 removeSet 用于移除

  • Last write wins set(LWW-element Set)

类似 2P-Set,有一个 addSet,一个 removeSet,不过对于元素增加了 timestamp 信息,且 timestamp 较高的 add 及 remove 优先

  • Observed-remove set(OR-Set)

类似 2P-Set,有一个 addSet,一个 removeSet,不过对于元素增加了 tag 信息,对于同一个 tag 的操作 add 优先于 remove

其他数据类型

Array

关于 Array 有 Replicated Growable Array(RGA),支持 addRight(v, a)操作

Graph

Graph 可以基于 Sets 结构实现,不过需要处理并发的 addEdge(u, v)、removeVertex(u)操作

Map

Map 需要处理并发的 put、rmv 操作

实例

这里使用 wurmloch-crdt 的实现

GCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GCounter.java

public class GCounter extends AbstractCrdt<GCounter, GCounter.UpdateCommand> {

    // fields
    private Map<String, Long> entries = HashMap.empty();


    // constructor
    public GCounter(String nodeId, String crdtId) {super(nodeId, crdtId, BehaviorProcessor.create());
    }


    // crdt
    @Override
    protected Option<UpdateCommand> processCommand(UpdateCommand command) {
        final Map<String, Long> oldEntries = entries;
        entries = entries.merge(command.entries, Math::max);
        return entries.equals(oldEntries)? Option.none() : Option.of(new UpdateCommand(crdtId, entries));
    }


    // core functionality
    public long get() {return entries.values().sum().longValue();
    }

    public void increment() {increment(1L);
    }

    public void increment(long value) {if (value < 1L) {throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        entries = entries.put(nodeId, entries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                entries
        ));
    }

    //......
}
  • 这里 GCounter 使用 HashMap 来实现,其 processCommand 接收 UpdateCommand,采用 HashMap 的 merge 方法进行合并,其中 BiFunction 为 Math::max;get()方法对 entries.values()进行 sum 得出结果

PNCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/PNCounter.java

public class PNCounter extends AbstractCrdt<PNCounter, PNCounter.UpdateCommand> {

    // fields
    private Map<String, Long> pEntries = HashMap.empty();
    private Map<String, Long> nEntries = HashMap.empty();


    // constructor
    public PNCounter(String nodeId, String crtdId) {super(nodeId, crtdId, BehaviorProcessor.create());
    }


    // crdt
    protected Option<UpdateCommand> processCommand(PNCounter.UpdateCommand command) {
        final Map<String, Long> oldPEntries = pEntries;
        final Map<String, Long> oldNEntries = nEntries;
        pEntries = pEntries.merge(command.pEntries, Math::max);
        nEntries = nEntries.merge(command.nEntries, Math::max);
        return pEntries.equals(oldPEntries) && nEntries.equals(oldNEntries)? Option.none()
                : Option.of(new UpdateCommand(crdtId, pEntries, nEntries));
    }


    // core functionality
    public long get() {return pEntries.values().sum().longValue() - nEntries.values().sum().longValue();}

    public void increment() {increment(1L);
    }

    public void increment(long value) {if (value < 1L) {throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        pEntries = pEntries.put(nodeId, pEntries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                pEntries,
                nEntries
        ));
    }

    public void decrement() {decrement(1L);
    }

    public void decrement(long value) {if (value < 1L) {throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        nEntries = nEntries.put(nodeId, nEntries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                pEntries,
                nEntries
        ));
    }

    //......
}
  • 这里 PNCounter 使用了两个 HashMap 来实现,其中 pEntries 用于递增,nEntries 用于递减;processCommand 采用 HashMap 的 merge 方法分别对 pEntries 及 nEntries 进行合并,其中 BiFunction 为 Math::max;get()方法则使用 pEntries.values()的 sum 减去 nEntries.values()的 sum

LWWRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/LWWRegister.java

public class LWWRegister<T> extends AbstractCrdt<LWWRegister<T>, LWWRegister.SetCommand<T>> {

    // fields
    private T value;
    private StrictVectorClock clock;


    // constructor
    public LWWRegister(String nodeId, String crdtId) {super(nodeId, crdtId, BehaviorProcessor.create());
        this.clock = new StrictVectorClock(nodeId);
    }


    // crdt
    protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {if (clock.compareTo(command.getClock()) < 0) {clock = clock.merge(command.getClock());
            doSet(command.getValue());
            return Option.of(command);
        }
        return Option.none();}


    // core functionality
    public T get() {return value;}

    public void set(T newValue) {if (! Objects.equals(value, newValue)) {doSet(newValue);
            commands.onNext(new SetCommand<>(
                    crdtId,
                    value,
                    clock
            ));
        }
    }


    // implementation
    private void doSet(T value) {
        this.value = value;
        clock = clock.increment();}

    //......
}
  • 这里 LWWRegister 使用了 StrictVectorClock,其 processCommand 接收 SetCommand,它在本地 clock 小于 command.getClock()会先 merge clock,然后执行 doSet 更新 value,同时更新本地为 clock.increment()

MVRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/MVRegister.java

public class MVRegister<T> extends AbstractCrdt<MVRegister<T>, MVRegister.SetCommand<T>> {

    // fields
    private Array<Entry<T>> entries = Array.empty();


    // constructor
    public MVRegister(String nodeId, String crdtId) {super(nodeId, crdtId, ReplayProcessor.create());
    }


    // crdt
    protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {final Entry<T> newEntry = command.getEntry();
        if (!entries.exists(entry -> entry.getClock().compareTo(newEntry.getClock()) > 0
                || entry.getClock().equals(newEntry.getClock()))) {
            final Array<Entry<T>> newEntries = entries
                    .filter(entry -> entry.getClock().compareTo(newEntry.getClock()) == 0)
                    .append(newEntry);
            doSet(newEntries);
            return Option.of(command);
        }
        return Option.none();}


    // core functionality
    public Array<T> get() {return entries.map(Entry::getValue);
    }

    public void set(T newValue) {if (entries.size() != 1 || !Objects.equals(entries.head().getValue(), newValue)) {final Entry<T> newEntry = new Entry<>(newValue, incVV());
            doSet(Array.of(newEntry));
            commands.onNext(new SetCommand<>(
                    crdtId,
                    newEntry
            ));
        }
    }


    // implementation
    private void doSet(Array<Entry<T>> newEntries) {entries = newEntries;}

    private VectorClock incVV() {final Array<VectorClock> clocks = entries.map(Entry::getClock);
        final VectorClock mergedClock = clocks.reduceOption(VectorClock::merge).getOrElse(new VectorClock());
        return mergedClock.increment(nodeId);
    }

    //......
}
  • 这里 LWWRegister 使用了 Array 以及 StrictVectorClock,其 processCommand 接收 SetCommand,它在没有 entry 的 clock 大于或者 equals newEntry.getClock()时会创建新的 newEntries,该 newEntries 不包含 clock 与 newEntry.getClock()等值的 entry,同时加入了 newEntry,最后使用 doSet 赋值给本地的 entries

GSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GSet.java

public class GSet<E> extends AbstractSet<E> implements Crdt<GSet<E>, GSet.AddCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Processor<AddCommand<E>, AddCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public GSet(String crdtId) {this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {return crdtId;}

    @Override
    public void subscribe(Subscriber<? super AddCommand<E>> subscriber) {commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends AddCommand<E>> publisher) {Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {final Option<AddCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);
        });
    }

    private Option<AddCommand<E>> processCommand(AddCommand<E> command) {return doAdd(command.getElement())? Option.of(command) : Option.none();}


    // core functionality
    @Override
    public int size() {return elements.size();
    }

    @Override
    public Iterator<E> iterator() {return new GSetIterator();
    }

    @Override
    public boolean add(E element) {commands.onNext(new AddCommand<>(crdtId, element));
        return doAdd(element);
    }


    // implementation
    private synchronized boolean doAdd(E element) {return elements.add(element);
    }

    //......
}
  • 这里 GSet 使用 Set 来实现,其 processCommand 接收 AddCommand,其 doAdd 方法使用 Set 的 add 进行合并

TwoPhaseSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/TwoPSet.java

public class TwoPSet<E> extends AbstractSet<E> implements Crdt<TwoPSet<E>, TwoPSet.TwoPSetCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Set<E> tombstone = new HashSet<>();
    private final Processor<TwoPSetCommand<E>, TwoPSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public TwoPSet(String crdtId) {this.crdtId = Objects.requireNonNull(crdtId, "CrdtId must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {return crdtId;}

    @Override
    public void subscribe(Subscriber<? super TwoPSetCommand<E>> subscriber) {commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends TwoPSetCommand<E>> publisher) {Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {final Option<TwoPSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<TwoPSetCommand<E>> processCommand(TwoPSetCommand<E> command) {if (command instanceof TwoPSet.AddCommand) {return doAdd(((TwoPSet.AddCommand<E>) command).getElement())? Option.of(command) : Option.none();} else if (command instanceof TwoPSet.RemoveCommand) {return doRemove(((TwoPSet.RemoveCommand<E>) command).getElement())? Option.of(command) : Option.none();}
        return Option.none();}


    // core functionality
    @Override
    public int size() {return elements.size();
    }

    @Override
    public Iterator<E> iterator() {return new TwoPSetIterator();
    }

    @Override
    public boolean add(E value) {final boolean changed = doAdd(value);
        if (changed) {commands.onNext(new TwoPSet.AddCommand<>(crdtId, value));
        }
        return changed;
    }


    // implementation
    private boolean doAdd(E value) {return !tombstone.contains(value) && elements.add(value);
    }

    private boolean doRemove(E value) {return tombstone.add(value) | elements.remove(value);
    }

    //......
}
  • 这里 TwoPSet 使用了两个 Set 来实现,其中 elements 用于 add,tombstone 用于 remove;其 processCommand 方法接收 TwoPSetCommand,它有 TwoPSet.AddCommand 及 TwoPSet.RemoveCommand 两个子类,两个 command 分别对应 doAdd 及 doRemove 方法;doAdd 要求 tombstone 不包含该元素并往 elements 添加元素;doRemove 往 tombstone 添加元素并从 elements 移除元素

ORSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/ORSet.java

public class ORSet<E> extends AbstractSet<E> implements Crdt<ORSet<E>, ORSet.ORSetCommand<E>> /*, ObservableSet<E> */ {

    // fields
    private final String crdtId;
    private final Set<Element<E>> elements = new HashSet<>();
    private final Set<Element<E>> tombstone = new HashSet<>();
    private final Processor<ORSetCommand<E>, ORSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public ORSet(String crdtId) {this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {return crdtId;}

    @Override
    public void subscribe(Subscriber<? super ORSetCommand<E>> subscriber) {commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends ORSetCommand<E>> publisher) {Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {final Option<ORSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<ORSetCommand<E>> processCommand(ORSetCommand<E> command) {if (command instanceof AddCommand) {return doAdd(((AddCommand<E>) command).getElement())? Option.of(command) : Option.none();} else if (command instanceof RemoveCommand) {return doRemove(((RemoveCommand<E>) command).getElements())? Option.of(command) : Option.none();}
        return Option.none();}


    // core functionality
    @Override
    public int size() {return doElements().size();}

    @Override
    public Iterator<E> iterator() {return new ORSetIterator();
    }

    @Override
    public boolean add(E value) {final boolean contained = doContains(value);
        prepareAdd(value);
        return !contained;
    }


    // implementation
    private static <U> Predicate<Element<U>> matches(U value) {return element -> Objects.equals(value, element.getValue());
    }

    private synchronized boolean doContains(E value) {return elements.parallelStream().anyMatch(matches(value));
    }

    private synchronized Set<E> doElements() {return elements.parallelStream().map(Element::getValue).collect(Collectors.toSet());
    }

    private synchronized void prepareAdd(E value) {final Element<E> element = new Element<>(value, UUID.randomUUID());
        commands.onNext(new AddCommand<>(getCrdtId(), element));
        doAdd(element);
    }

    private synchronized boolean doAdd(Element<E> element) {return (elements.add(element) | elements.removeAll(tombstone)) && (!tombstone.contains(element));
    }

    private synchronized void prepareRemove(E value) {final Set<Element<E>> removes = elements.parallelStream().filter(matches(value)).collect(Collectors.toSet());
        commands.onNext(new RemoveCommand<>(getCrdtId(), removes));
        doRemove(removes);
    }

    private synchronized boolean doRemove(Collection<Element<E>> removes) {return elements.removeAll(removes) | tombstone.addAll(removes);
    }

    //......
}
  • 这里 ORSet 使用了两个 Set 来实现,其中 elements 用于 add,tombstone 用于 remove;其 processCommand 方法接收 ORSetCommand,它有 ORSet.AddCommand 及 ORSet.RemoveCommand 两个子类,两个 command 分别对应 doAdd 及 doRemove 方法;doAdd 方法首先执行 prepareAdd 使用 UUID 创建 element,然后往 elements 添加元素,移除 tombstone;doRemove 方法首先执行 prepareRemove 找出需要移除的 element 集合 removes,然后从 elements 移除 removes 并往 tombstone 添加 removes

小结

  • CRDT 是 Conflict-free Replicated Data Type 的简称,也称为 a passive synchronisation,即免冲突的可复制的数据类型;具体实现上可以分为 State-based 的 CvRDT、Operation-based 的 CmRDT、Delta-based、Pure operation-based 等
  • CvRDT 即 Convergent Replicated Data Type 的简称,也称为 an active synchronisation,通常在诸如 NFS, AFS, Coda 的文件系统以及诸如 Riak, Dynamo 的 KV 存储中使用;CvRDT 即 Convergent Replicated Data Type 的简称,也称为 an active synchronisation,通常在诸如 NFS, AFS, Coda 的文件系统以及诸如 Riak, Dynamo 的 KV 存储中使用
  • 对于 CRDT 来说,为了实现 Conflict-free Replicated 要求对数据结构的操作是 Convergent,即需要满足 Associative、Commutative 及 Idempotent;CRDT 的基本数据类型有 Counters(G-Counter、PN-Counter)、Registers(LWW-Register、MV-Register)、Sets(G-Set、2P-Set、LWW-element Set、OR-Set`)

doc

  • 谈谈 CRDT
  • CRDT——解决最终一致问题的利器
  • Akka Distributed Data Deep Dive
  • Conflict-free replicated data types
  • CRDT: Conflict-free Replicated Data Types
  • Introduction to Conflict-Free Replicated Data Types
  • A Look at Conflict-Free Replicated Data Types (CRDT)
  • Conflict-free Replicated Data Types
  • A comprehensive study of Convergent and Commutative Replicated Data Types
  • wurmloch-crdt

正文完
 0