关于javascript:Yjs代码简析

3次阅读

共计 13136 个字符,预计需要花费 33 分钟才能阅读完成。

CRDT

在多人合作畛域除了上一篇所介绍的 OT 算法,还有后起之秀 CRDT (conflict-free replicated data type) 无抵触复制数据类型:是一种能够协调网络上多个正本达到一致性的算法。
CRDT 又分为两种实现:基于状态的 CmRDT 和基于操作的 CvRDT;然而在理论利用的时候,基于状态的 CmRDT 的个别实用价值比拟低(数据传输体积更大),所以 Yjs 也是基于操作的 CvRDT 的一个实现。

CRDT VS OT

OT CRDT
OT 依赖中心化服务器实现合作 CRDT 算法能够通过 P2P 的形式实现数据的同步
为保障一致性,OT 的算法设计时的复杂度更高 为保障一致性,CRDT 算法设计更简略
OT 的设计更容易保留用户用意 设计一个保留用户用意的 CRDT 算法更艰难
OT 不影响文档体积 CRDT 文档比原文档数据更大

因为 CRDT 对于 OT 算法来说,解决抵触算法是固定的,齐全绕过过 OT 算法中的最难的中央 transfrom 办法的设计(transform 的复杂度会随着 operation 类型增多快速增长,而且保障正确性也须要付出更多的工夫和精力)。

Lamport Clock

在 Yjs 实现外面每个字符都是有一个惟一的 id(clientId, clock),而外面的 clock 就是 Lamport Clock。
因为在分布式外面,所有的节点的时钟是很难放弃准确对立的,想通过赋予一个工夫戳给事件,而后比照事件的工夫戳还原出各个节点的事件程序简直无奈实现,所以 Lamport Timestamp 就被提出来,它通过节点之间替换信息,并应用 logic clock 而不是 true lock 来解决事件程序问题。

例如上图 A,B,C 初始的 clock 都是 0,当 C1 产生时就把本地 clock+1,并播送给其余节点,当其余节点 A,B 接管到 C 的的播送的信息时就应用 max(本地的 clock, 接管的 clock)尝试更新本地的 clock,这样能够了解为 A,B 之后的事件是预感了 C 所产生事件才产生的,所以前面产生事件的 clock 都会比接管到的 clock 大。
然而并不象征所有 clock 比拟大的事件就肯定在比拟小的事件前面产生:在图外面 B3 的 clock 就比 A2 小,然而 A2 的事件理论并不是在 B3 前面产生的(可能是同时,也可能在它之前),所以 Lamport Clock 存在肯定的局限性,并不能齐全能够从 clock 的大小就能推出所有事件间的程序,只能取得局部有序的事件序列。

Vector Clock

Vector Clock 是在 Lamport Clock 根底上扩大而来。Vector Clock 要求每个一个节点都有本人独立的 clock,当有本地事件产生时候只更新本人节点的 clock+1;而在播送音讯时要把节点整个 Vector Clock 传出去,当接管到音讯时须要遍历接管的 Vector Clock 上所有节点,并应用 max(节点的 clock,接管的节点 clock)来更新。
定义 Va,Vb 为事件 A 和 B 的 Vector Clock,再定义 Vb >= Va 为 Vb 蕴含每个节点 clock 都大于等于 Va 上每个节点 clock;那么就有 Vb >= Va 象征事件 B 会在 A 的前面产生,因为这样阐明 B 的事件产生的时候,是明确看到 A 的时间轴上的事件。

如上图,大部分 Vector Clock 都能持续找到对应的 >= 关系,然而在同样的 A2 和 B3 地位上,是不存在 A2 >= B3 也不存在 B3 >= A2 的关系的,这样就阐明这里可能存在并发的事件,无奈确定精确的程序,这也是合乎预期的,并不会产生 Lamport Clock 那样的问题。

Yjs 剖析

数据结构

Yjs 提供丰盛的数据结构 YArray, YMap, YText 等,而且数据结构间还能够组合一起,这样能够利用这些组成更加简单的数据结构十分不便容易融入不同利用场景中。
首先介绍外围构造 Item。

Item

作为 Yjs 最外围的构造,撑持其余所有构造的实现,所以如果可能把这个构造吃透,根本其余构造也是很容易了解。
Item 属性如下:

type Item = {// 由 [clientId, clock] 组成,CRDT 算法要求每个字符都调配一个 id,而 Yjs 为了节俭空间,会把雷同 clientId 且间断的字符串合并成一个 Item
    // 所以 Item 的 id 也就是第一个字符的 id,而通过 clock+ 字符串长度失去 Item 最初一个字符的 id
    id: ID, 
    left: Item | null,
    origin: ID | null,
    right: Item | null,
    ID: rightOrigin,
    parent: AbstractType<any>|ID|null,
    parentSub: string | null,
    content: AbstractContent,
    keep: boolean, // 用于 redo/undo 场景,防止 Item 过早回收
    info: number, // 状态标记位
}

origin 和 rightOrigin 指向的是 Item 插入时地位前后的字符的 ID,次要是用在解决抵触的时候用来维持原始用户操作用意;left,right 是 Item 的通过解决抵触后理论前后节点,所以 Item 是双向链表的构造,而且 left,right 在不会参加序列化传输的,只有 origin,rightOrigin 和 content 会被序列化传输过来;parent 指向父节点(YText),parentSub 当父节点内容是 YMap 时,parentSub 就是 key;而父节点内容是其余的时候就是为 null 了。content 就是 Item 理论承载的内容能够是 ContentString,ContentJSON,ContentDoc,ContentType 等等。

integrate 办法

integrate 办法是 Item 的外围办法,次要是通过待插入 Item 的 origin 和 rightOrigin,而后在 Item 链表外面寻找适宜的地位进行插入。
首先正如后面所说的,因为 left,right 属性是不跟在一起传输的;所以接管到内部操作的时就是应用 getMissing 办法,在本地 Item 链表下来寻找 origin 和 rightOrigin,作为初始的 left 和 right,如果找不到 origin 或 rightOrigin 的 Item 意味着依然有其余用户依赖的操作还没有接管,放到期待队列外面,等接管到必须的操作才进入解决抵触。

接着再看 integrate 办法,这个时候咱们须要解决 origin 和 rightOrigin 之间存在的节点抵触。
而 Yjs 应用的是本人一套解决抵触的办法,外围规定就是:

  1. orgin 之间的连线不能相交;
  2. 满足规定 1 后,clientId 小的节点排在右边;

红色连线都是节点的 origin 指向,当两根红线穿插的时候其实曾经阐明用户用意曾经被毁坏了;因为 C 的用意是紧跟 A 的前面,D 的用意是紧跟 B 的前面,而 B 用意是紧跟 A 前面,适当调整一下 C 的地位能够最大满足用户初始用意;尽管说 B 的用意最终没有被满足,然而多人合作中解决抵触只能尽可能达到最优,很多时候没有完满的解法。
而 integrate 办法理论解决状况还会多一些:

integrate (transaction, offset) {if (this.parent) {if ((!this.left && (!this.right || this.right.left !== null)) || (this.left && this.left.right !== this.right)) {
        let left = this.left
        let o
        if (left !== null) {o = left.right} else if (this.parentSub !== null) {o = (this.parent)._map.get(this.parentSub) || null
          while (o !== null && o.left !== null) {o = o.left}
        } else {o = (this.parent)._start
        }
        
        const conflictingItems = new Set()
     
        const itemsBeforeOrigin = new Set()
        // Let c in conflictingItems, b in itemsBeforeOrigin
        // ***{origin}bbbb{this}{c,b}{c,b}{o}***
        // Note that conflictingItems is a subset of itemsBeforeOrigin
        while (o !== null && o !== this.right) {itemsBeforeOrigin.add(o)
          conflictingItems.add(o)
          if (compareIDs(this.origin, o.origin)) {
            // 场景 1
            if (o.id.client < this.id.client) {
              left = o
              conflictingItems.clear()} else if (compareIDs(this.rightOrigin, o.rightOrigin) {break} 
          // 场景 3
          } else if (o.origin !== null && itemsBeforeOrigin.has(getItem(transaction.doc.store, o.origin))) {if (!conflictingItems.has(getItem(transaction.doc.store, o.origin))) {
              left = o
              conflictingItems.clear()}
          } else {
            // 场景 2
            break
          }
          o = o.right
        }
        this.left = left
      }
      if (this.left !== null) {
        const right = this.left.right
        this.right = right
        this.left.right = this
      } else {
        let r
        if (this.parentSub !== null) {r = (this.parent)._map.get(this.parentSub) || null
          while (r !== null && r.left !== null) {r = r.left}
        } else {r = (this.parent)._start; 
            (this.parent)._start = this
        }
        this.right = r
      }
      if (this.right !== null) {this.right.left = this} else if (this.parentSub !== null) {(this.parent)._map.set(this.parentSub, this)
        if (this.left !== null) {this.left.delete(transaction)
        }
      }
      // adjust length of parent
      if (this.parentSub === null && this.countable && !this.deleted) {(this.parent)._length += this.length
      }
      addStruct(transaction.doc.store, this)
      this.content.integrate(transaction, this)
      // add parent to transaction.changed
      addChangedTypeToTransaction(transaction, (this.parent), this.parentSub)
      if (((this.parent)._item !== null && /** @type {AbstractType<any>} */ (this.parent)._item.deleted) || (this.parentSub !== null && this.right !== null)) {this.delete(transaction)
      }
    } else {
      // 如果没有 parent,应用 GC 构造 integrate
      new GC(this.id, this.length).integrate(transaction, 0)
    }
}

如果单看代码感觉容易一头雾水,列一下场景会容易了解一些:

  1. 场景 1:抵触的节点 Origin 都雷同

    比较简单,E 只跟 {B, C} 有抵触,而且他们之间的 Origin 连线也不会相交(因为他们的 Origin 都是同一个),所以只须要比照他们的 clientId 就能够找到适当的地位

  2. 场景 2:抵触节点 Origin 不一样,导致 Origin 连线相交

    这个时候 E 跟 C 有抵触然而 C 的 Origin 跟 E 的 Origin 不一样,E 是不能跟在 C 前面因为这样 E 的连线就会相交,所以 E 只能放在 C 后面

  3. 场景 3: 抵触的节点 Origin 不雷同,然而 Origin 也是指向之前的跳过的抵触节点

    E 跟 B 互相冲突,然而 B 因为 clientId 比拟小所以应该排在 E 的右边,所以 conflictingItems.clear()会先革除跳过的抵触节点;接着遍历到 {C,D}
    的时候会发现他们的 Origin 指向的是之前跳过的 B 节点,所以也让他们一起跳过,最初发现 D 前面的地位是适合的。

到此外围局部曾经解释实现,这个时候新的节点也找到本人对应的 left 和 right。

delete 办法

Yjs 的删除操作采纳的是很多 CRDT 算法所采纳的墓碑形式:即不做任何理论删除操作,只是仅仅做一个标记,这也是很多人对 CRDT 担心的意味着随着操作减少空间占用也会越来越大,性能也会越来越低。

delete (transaction) {if (!this.deleted) {const parent = /** @type {AbstractType<any>} */ (this.parent)
      // adjust the length of parent
      if (this.countable && this.parentSub === null) {parent._length -= this.length}
      this.markDeleted()
      addToDeleteSet(transaction.deleteSet, this.id.client, this.id.clock, this.length)
      addChangedTypeToTransaction(transaction, parent, this.parentSub)
      this.content.delete(transaction)
    }
  }
export const addToDeleteSet = (ds, client, clock, length) => {map.setIfUndefined(ds.clients, client, () => /** @type {Array<DeleteItem>} */ ([])).push(new DeleteItem(clock, length))
}

删除的 Item 最终会转变成 DeleteItem 退出到 transaction 的 deleteSet 中,而后会跟着序列化传输到其余用户上。
同样当接管到反序列化后的 deleteSet,发现如果有其余操作还没接管到就会放到 store 的 pendingDs 期待前面接管到新的操作再进行解决。

gc 办法

刚刚也提到所有 delete 的节点只会标记一下,并不会实在的解决,然而 Yjs 也提供了一个 GC 的办法来回收内存。

 gc (store, parentGCd) {if (!this.deleted) {throw error.unexpectedCase()
    }
    this.content.gc(store)
    if (parentGCd) {replaceStruct(store, this, new GC(this.id, this.length))
    } else {this.content = new ContentDeleted(this.length)
    }
  }

gc 的办法就会把内容替换成 ContentDelete,Yjs 在每次 transaction 完结时都会触发一次 tryGcDeleteSet(ds, store, doc.gcFilter)

const tryGcDeleteSet = (ds, store, gcFilter) => {for (const [client, deleteItems] of ds.clients.entries()) {const structs = /** @type {Array<GC|Item>} */ (store.clients.get(client))
    for (let di = deleteItems.length - 1; di >= 0; di--) {const deleteItem = deleteItems[di]
      const endDeleteItemClock = deleteItem.clock + deleteItem.len
      for (let si = findIndexSS(structs, deleteItem.clock), struct = structs[si];
        si < structs.length && struct.id.clock < endDeleteItemClock;
        struct = structs[++si]
      ) {const struct = structs[si]
        if (deleteItem.clock + deleteItem.len <= struct.id.clock) {break}
        if (struct instanceof Item && struct.deleted && !struct.keep && gcFilter(struct)) {struct.gc(store, false)
        }
      }
    }
  }
}

默认的 gcFilter 都是返回 true,所以默认状况下删除的 Item 都会被 gc 掉。

YText

YText 是须要重点摸索的一个数据结构,Yjs 对于文本操作只提供插入,删除和格式化三个操作。

插入文本

从 insert 办法开始:

 insert (index, text, attributes) {if (text.length <= 0) {return}
    const y = this.doc
    if (y !== null) {
      transact(y, transaction => {const pos = findPosition(transaction, this, index)
        if (!attributes) {attributes = {}
          // @ts-ignore
          pos.currentAttributes.forEach((v, k) => {attributes[k] = v })
        }
        insertText(transaction, this, pos, text, attributes)
      })
    } else {/** @type {Array<function>} */ (this._pending).push(() => this.insert(index, text, attributes))
    }
  }

首先 Yjs 的所有操作都是在一个 transaction 外面进行的,在 transaction 外面先通过 findPosition 找到文本开始插入的 Item 地位,如果插入的地位在 Item 的两头就须要 splitItem 决裂成两个 Item。这个遍历是从 Item 列表一开始查找遍历的所以是一个 O(n)的操作,然而 Yjs 会通过保留最近操作的地位来放慢搜寻速度,而且在搜寻的过程中也要计算文本的格局属性,文本的每一个属性也会生成一个 Item,它的 content 会是 ContentFormat(次要是一个 key value 的键值对)。

如同上图所示,”BBB” 的色彩是红色,然而后续的会被前面的格局笼罩,”aaa” 的色彩就会变成红色。
你很快会发现这种形式对操作格式化其实挺简单的;当初要插入 ”BBB” 和 ”aaa” 插入带有斜体的 ”EEE” 的字符串,就得在插入 ”EEE” 前面得加上斜体革除的格局。

其实能够设想无论减少还是删除文本的时候要革除它的格局都会异样麻烦。

const insertText = (transaction, parent, currPos, text, attributes) => {currPos.currentAttributes.forEach((_val, key) => {if (attributes[key] === undefined) {attributes[key] = null
    }
  })
  const doc = transaction.doc
  const ownClientId = doc.clientID
  minimizeAttributeChanges(currPos, attributes)
  // 1
  const negatedAttributes = insertAttributes(transaction, parent, currPos, attributes)
  // insert content
  const content = text.constructor === String ? new ContentString(/** @type {string} */ (text)) : (text instanceof AbstractType ? new ContentType(text) : new ContentEmbed(text))
  let {left, right, index} = currPos
  if (parent._searchMarker) {updateMarkerChanges(parent._searchMarker, currPos.index, content.getLength())
  }
  // 2
  right = new Item(createID(ownClientId, getState(doc.store, ownClientId)), left, left && left.lastId, right, right && right.id, parent, null, content)
  // 3
  right.integrate(transaction, 0)
  currPos.right = right
  currPos.index = index
  currPos.forward()
  // 4
  insertNegatedAttributes(transaction, parent, currPos, negatedAttributes)
}

代码上的 1 和 4 两个中央就是代表插入属性和插入打消属性两个操作。而后 2,3 步就是插入新的节点;
最初进入 transaction 清理阶段:

export const transact = (doc, f, origin = null, local = true) => {
  const transactionCleanups = doc._transactionCleanups
  let initialCall = false
  /**
   * @type {any}
   */
  let result = null
  if (doc._transaction === null) {
    initialCall = true
    doc._transaction = new Transaction(doc, origin, local)
    transactionCleanups.push(doc._transaction)
    if (transactionCleanups.length === 1) {doc.emit('beforeAllTransactions', [doc])
    }
    doc.emit('beforeTransaction', [doc._transaction, doc])
  }
  try {result = f(doc._transaction)
  } finally {if (initialCall) {const finishCleanup = doc._transaction === transactionCleanups[0]
      doc._transaction = null
      if (finishCleanup) {cleanupTransactions(transactionCleanups, 0)
      }
    }
  }
  return result
}

简略阐明一下整个 transaction 过程:

  1. transaction 初始化时先计算 beforeState (StateVector)
  2. 业务操作
  3. 清理阶段
    1)计算 afterState(StateVector)用于跟 beforeState 比照,找出前后的差别
    2)尝试合并 deleteSet(让传输信息体积更小)
    3)触发 YText,YArray 等构造上的事件监听器;相似在 YText 上会遍历整个 Item 链表找出有哪些节点是改变或者减少的,聚合到事件上
    4)尝试回收被标记删除的 Item 节点
    5)尝试合并曾经标记删除的 Item 节点(让 Item 链表节点更少,占用内存更少)
    6)尝试合并新增加的 Item 节点
    7) 把新减少的 Item 节点和 deleteSet 写入 UpdateMessage,而后触发文档上的 update 事件
    而后就能够把 UpdateMessage 发送给其余用户节点了
删除文本

删除文本自身解决是很简略的,只须要找到对应文本的 Item 节点而后调用它的 delete 办法即可,然而还要清理文本的格局就很麻烦了,须要小心解决不能革除会影响后续文本格式的属性。

const deleteText = (transaction, currPos, length) => {
  const startLength = length
  const startAttrs = map.copy(currPos.currentAttributes)
  const start = currPos.right
  while (length > 0 && currPos.right !== null) {if (currPos.right.deleted === false) {switch (currPos.right.content.constructor) {
        case ContentType:
        case ContentEmbed:
        case ContentString:
          if (length < currPos.right.length) {getItemCleanStart(transaction, createID(currPos.right.id.client, currPos.right.id.clock + length))
          }
          length -= currPos.right.length
          // 只须要调用 delete 办法
          currPos.right.delete(transaction)
          break
      }
    }
    currPos.forward()}
  if (start) {
    // 清理文本格式
    cleanupFormattingGap(transaction, start, currPos.right, startAttrs, currPos.currentAttributes)
  }
  const parent = /** @type {AbstractType<any>} */ (/** @type {Item} */ (currPos.left || currPos.right).parent)
  if (parent._searchMarker) {updateMarkerChanges(parent._searchMarker, currPos.index, -startLength + length)
  }
  return currPos
}

cleanupFormattingGap 办法就是清理被删除文本的格局,须要步骤:

  1. 先计算被删除文本前面的第一段文本的所继承的格局汇合
  2. 再回头遍历一次,找出不在后面计算的汇合外面的 ContentFormat 节点进行删除
格式化文本

正如之前所提到的操作文本格式是异样麻烦的,如果要格式化一段文本,同样须要以下步骤:

  1. 找出须要格式化的文本的开始节点地位
  2. 计算后面继承来的格局汇合与行将插入的格局汇合的差汇合,依据计算的汇合别离插入对应的 ContentFormat 节点
  3. 而后遍历要格式化文本外面的所有节点,把不一样的 ContentFormat 节点删除,同时收集会影响前面文本的格局汇合
  4. 把收集的受影响格局从新插入到被删文本的节点前面

YArray

YArray 属于简化版本的 YText,略过。

YMap

set

YMap 每个 Key 都指向一个 Item 双向链表,跟 YText 不一样的是,Key 永远指向这个链表的最初一个节点,而其余节点都会标记为删除。

export const typeMapSet = (transaction, parent, key, value) => {
  // 获取最初的节点
  const left = parent._map.get(key) || null
  const doc = transaction.doc
  const ownClientId = doc.clientID
  let content
  if (value == null) {content = new ContentAny([value])
  } else {switch (value.constructor) {
      case Number:
      case Object:
      case Boolean:
      case Array:
      case String:
        content = new ContentAny([value])
        break
      case Uint8Array:
        content = new ContentBinary(/** @type {Uint8Array} */ (value))
        break
      case Doc:
        content = new ContentDoc(/** @type {Doc} */ (value))
        break
      default:
        if (value instanceof AbstractType) {content = new ContentType(value)
        } else {throw new Error('Unexpected content type')
        }
    }
  }
  // 每次都会新建一个节点,而这个节点也会插入到链表的最初面
  new Item(createID(ownClientId, getState(doc.store, ownClientId)), left, left && left.lastId, null, null, parent, key, content).integrate(transaction, 0)
}
get/delete

get/delete 都间接读取 / 删除最初一个节点即可

同步交互过程

Yjs 次要通过两个协定来实现各个节点间的同步交互:

  1. AwarenessProtocol
    用于节点发现,更新和离线事件感知。

    AwarenessProtocol 通过解包所有来往 AwarenessMessage 并记录每个 Client 的状态,如果发现没有记录的 Client 就退出到 Client Add List 外面,其余状况也如此类推。
    对于用户离线的判断,AwarenessProtol 次要靠两种形式:

    1. 接管到节点发送 state 为 null 的信息
    2. 超过 30s 没有接管到用户的信息
  2. SyncProtocol
    用于节点与节点间状态同步,文档形容整个过程理论只有两步,然而还是依据 y -websocket 把整个过程出现进去比拟直观:

    对于图中提到的 LocalState 和 AwarenessState 辨别:

    LocalState 实质上是各个节点对文档的操作序列,AwarenessState 则是自定义一些状态。

undo/redo

Yjs 的 undo/redo 有两种可选计划:

  1. 基于编辑器的 undo/redo
    这种计划次要针对富文本编辑器这里自带操作历史记录治理,只有 undo/redo 时候 Yjs 跟着利用对应操作即可,Yjs 自身无需做额定解决。
  2. 基于 Yjs 本身的 UndoManager
    UndoManager 能够针对更加简单的利用场景,自身 Yjs 提供的数据能够自由组合齐全能够脱离富文本这种场景应用,而且 UndoManager 能够只针对组合构造外面任一子结构进行治理,灵活性大很多。

然而总结两种计划对于 undo/redo 都会影响全局的其余用户,也就是象征不能独自只 undo/redo 本身操作(个别这种更合乎应用体验),如果须要这种成果须要做更进一步的解决。

总结

基于 Yjs 的代码剖析到此为止了,期待有一天可能利用到理论我的项目中。

参考

CRDT 简介
An introduction to Conflict-Free Replicated Data Types
Collaborative Editing in CodeMirror
How Figma’s multiplayer technology works
Realtime Editing of Ordered Sequences
Are CRDTs suitable for shared editing?
Lamport timestamp
Vector clock
On Consistency of Operational Transformation Approach
I was wrong. CRDTs are the future

正文完
 0