前文回顾
- 实现一个简略的 Database1(译文)
- 实现一个简略的 Database2(译文)
- 实现一个简略的 Database3(译文)
- 实现一个简略的 Database4(译文)
- 实现一个简略的 Database5(译文)
- 实现一个简略的 Database6(译文)
- 实现一个简略的 Database7(译文)
- 实现一个简略的 Database8(译文)
- 实现一个简略的 Database9(译文)
- 实现一个简略的 Database10(译文)
- 实现一个简略的 Database11(译文)
- 实现一个简略的 Database12(译文)
译注:cstack 在 github 保护了一个简略的、相似 sqlite 的数据库实现,通过这个简略的我的项目,能够很好的了解数据库是如何运行的。本文是第十三篇,次要是在节点决裂后更新父节点
Part 13 决裂后 更新 父节点
对于咱们史诗般的 B 树实现之旅的下一步,咱们将解决,在叶子节点决裂后修复父节点(也就是更新父结点中的条目信息)。我来用上面的例子作为参考:
Example of updating internal node
在这例子中,减少一个 Key “3” 到树中。这会引起左侧叶子节点决裂(假如节点中可寄存两个条目,超过三个将引起决裂)。在决裂后做一下几个步骤来修复树(的构造):
1、用左侧叶子节点(left child)中最大 Key (“3”) 来更新父节点中的第一个 Key。
2、在更新 Key 后增加一个新的指针和一个键对儿:
- 新指针指向新的孩子节点(也是叶子节点)
- 新减少的键对儿是新的孩子节点中的最大的 Key (“5”)
所以,首先得得首先,应用两个新函数替换之前的埋桩代码:
update_internal_node_key() for step 1 and internal_node_insert() for step 2
@@ -670,9 +725,11 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
*/
void* old_node = get_page(cursor->table->pager, cursor->page_num);
+ uint32_t old_max = get_node_max_key(old_node);
uint32_t new_page_num = get_unused_page_num(cursor->table->pager);
void* new_node = get_page(cursor->table->pager, new_page_num);
initialize_leaf_node(new_node);
+ *node_parent(new_node) = *node_parent(old_node);
*leaf_node_next_leaf(new_node) = *leaf_node_next_leaf(old_node);
*leaf_node_next_leaf(old_node) = new_page_num;
@@ -709,8 +766,12 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {if (is_node_root(old_node)) {return create_new_root(cursor->table, new_page_num);
} else {- printf("Need to implement updating parent after split\n");
- exit(EXIT_FAILURE);
+ uint32_t parent_page_num = *node_parent(old_node);
+ uint32_t new_max = get_node_max_key(old_node);
+ void* parent = get_page(cursor->table->pager, parent_page_num);
+
+ update_internal_node_key(parent, old_max, new_max);
+ internal_node_insert(cursor->table, parent_page_num, new_page_num);
+ return;
}
}
为了获取对父节点的援用,咱们须要开始在每个节点中记录指向其父节点的指针。
+uint32_t* node_parent(void* node) {return node + PARENT_POINTER_OFFSET;}
@@ -660,6 +675,48 @@ void create_new_root(Table* table, uint32_t right_child_page_num) {uint32_t left_child_max_key = get_node_max_key(left_child);
*internal_node_key(root, 0) = left_child_max_key;
*internal_node_right_child(root) = right_child_page_num;
+ *node_parent(left_child) = table->root_page_num;
+ *node_parent(right_child) = table->root_page_num;
}
当初,须要找到在父节点中被影响到的单元格。子节点不晓得它领有的 page number,所以咱们不能不能查找到。然而子节点晓得本人存有的最大的 Key,所以咱们能在父节点中找到这个 Key。
+void update_internal_node_key(void* node, uint32_t old_key, uint32_t new_key) {+ uint32_t old_child_index = internal_node_find_child(node, old_key);
+ *internal_node_key(node, old_child_index) = new_key;
}
在 internal_node_find_child()函数外部,咱们重用了一些咱们为了在外部节点查找一个 Key 代码。重构 internal_node_find()函数应用新的 helper 办法。
-Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {- void* node = get_page(table->pager, page_num);
+uint32_t internal_node_find_child(void* node, uint32_t key) {
+ /*
+ Return the index of the child which should contain
+ the given key.
+ */
+
uint32_t num_keys = *internal_node_num_keys(node);
- /* Binary search to find index of child to search */
+ /* Binary search */
uint32_t min_index = 0;
uint32_t max_index = num_keys; /* there is one more child than key */
@@ -386,7 +394,14 @@ Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {}}
- uint32_t child_num = *internal_node_child(node, min_index);
+ return min_index;
+}
+
+Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {+ void* node = get_page(table->pager, page_num);
+
+ uint32_t child_index = internal_node_find_child(node, key);
+ uint32_t child_num = *internal_node_child(node, child_index);
void* child = get_page(table->pager, child_num);
switch (get_node_type(child)) {case NODE_LEAF:
当初咱们进入了本文的外围议题,实现 internal_node_insert() 函数。我将分批来解释(这个函数)。
+void internal_node_insert(Table* table, uint32_t parent_page_num,
+ uint32_t child_page_num) {
+ /*
+ Add a new child/key pair to parent that corresponds to child
+ */
+
+ void* parent = get_page(table->pager, parent_page_num);
+ void* child = get_page(table->pager, child_page_num);
+ uint32_t child_max_key = get_node_max_key(child);
+ uint32_t index = internal_node_find_child(parent, child_max_key);
+
+ uint32_t original_num_keys = *internal_node_num_keys(parent);
+ *internal_node_num_keys(parent) = original_num_keys + 1;
+
+ if (original_num_keys >= INTERNAL_NODE_MAX_CELLS) {+ printf("Need to implement splitting internal node\n");
+ exit(EXIT_FAILURE);
+ }
索引须要插入的新单元格(子节点指针或者键值对儿)地位是依据新产生的子节点中的最大的 Key 决定的。在例子中咱们看到,child_max_key 值为 5 并且索引值为 1。
如果在外部节点没有空间来寄存一个单元格了,那么先抛出一个谬误。这个(外部节点决裂)稍后再实现它。
当初来看一下函数的:
+
+ uint32_t right_child_page_num = *internal_node_right_child(parent);
+ void* right_child = get_page(table->pager, right_child_page_num);
+
+ if (child_max_key > get_node_max_key(right_child)) {
+ /* Replace right child */
+ *internal_node_child(parent, original_num_keys) = right_child_page_num;
+ *internal_node_key(parent, original_num_keys) =
+ get_node_max_key(right_child);
+ *internal_node_right_child(parent) = child_page_num;
+ } else {
+ /* Make room for the new cell */
+ for (uint32_t i = original_num_keys; i > index; i--) {+ void* destination = internal_node_cell(parent, i);
+ void* source = internal_node_cell(parent, i - 1);
+ memcpy(destination, source, INTERNAL_NODE_CELL_SIZE);
+ }
+ *internal_node_child(parent, index) = child_page_num;
+ *internal_node_key(parent, index) = child_max_key;
+ }
+}
因为咱们寄存最右子节点指针和其余子节点指针 / 键值对儿是离开存储的,如果新的子节点变成了最右子节点,们就必须以不同的形式解决了。
在咱们的代码例子中,会进入 else 模块。首先,将其余单元格向右挪动一个空格,为新单元格腾出空间。(只管在咱们例子中有 0 个单元格挪动)
接下来,将新的子节点指针和键值写入由索引确定的单元格中。
为了缩小测试 case 的大小,我当初对 INTERNAL_NODE_MAX_CELLS 进行硬编码。
@@ -126,6 +126,8 @@ const uint32_t INTERNAL_NODE_KEY_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_CHILD_SIZE = sizeof(uint32_t);
const uint32_t INTERNAL_NODE_CELL_SIZE =
INTERNAL_NODE_CHILD_SIZE + INTERNAL_NODE_KEY_SIZE;
+/* Keep this small for testing */
+const uint32_t INTERNAL_NODE_MAX_CELLS = 3;
说到测试,咱们的大数据集测试通过旧的存根并进入新的存根:
@@ -65,7 +65,7 @@ describe 'database' do
result = run_script(script)
expect(result.last(2)).to match_array([
"db > Executed.",
- "db > Need to implement updating parent after split",
+ "db > Need to implement splitting internal node",
])
十分令人满意,我晓得。
我增加另一个测试来打印四节点 tree。正好咱们测例更多时测试程序 id,这个测试会以伪随机程序增加记录。
+ it 'allows printing out the structure of a 4-leaf-node btree' do
+ script = [
+ "insert 18 user18 person18@example.com",
+ "insert 7 user7 person7@example.com",
+ "insert 10 user10 person10@example.com",
+ "insert 29 user29 person29@example.com",
+ "insert 23 user23 person23@example.com",
+ "insert 4 user4 person4@example.com",
+ "insert 14 user14 person14@example.com",
+ "insert 30 user30 person30@example.com",
+ "insert 15 user15 person15@example.com",
+ "insert 26 user26 person26@example.com",
+ "insert 22 user22 person22@example.com",
+ "insert 19 user19 person19@example.com",
+ "insert 2 user2 person2@example.com",
+ "insert 1 user1 person1@example.com",
+ "insert 21 user21 person21@example.com",
+ "insert 11 user11 person11@example.com",
+ "insert 6 user6 person6@example.com",
+ "insert 20 user20 person20@example.com",
+ "insert 5 user5 person5@example.com",
+ "insert 8 user8 person8@example.com",
+ "insert 9 user9 person9@example.com",
+ "insert 3 user3 person3@example.com",
+ "insert 12 user12 person12@example.com",
+ "insert 27 user27 person27@example.com",
+ "insert 17 user17 person17@example.com",
+ "insert 16 user16 person16@example.com",
+ "insert 13 user13 person13@example.com",
+ "insert 24 user24 person24@example.com",
+ "insert 25 user25 person25@example.com",
+ "insert 28 user28 person28@example.com",
+ ".btree",
+ ".exit",
+ ]
+ result = run_script(script)
原样是输入如下的样子:
- internal (size 3)
- leaf (size 7)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- key 1
- leaf (size 8)
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- key 15
- leaf (size 7)
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- key 22
- leaf (size 8)
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
db >
仔细观察,你会发现一个 bug:
- 5
- 6
- 7
- key 1
key 在这个地位应该输入 7,不是 1!
通过一堆的调试,我发现这是因为蹩脚的指针算法导致的。
uint32_t* internal_node_key(void* node, uint32_t key_num) {- return internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;
+ return (void*)internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;
}
INTERNAL_NODE_CHILD_SIZE
值是 4。在这里我的想法是想给函数 internal_node_cell()
的后果加 4 个字节,然而从函数 internal_node_cell()
的返回值返回的 uint32_t 指针,它实际上是加了 4 sizeof(uint32_t) 个字节。我批改了这里,在计算之前,通过转换一个 void 来解决了这个问题。
留神!空指针上的指针计算不是 C 规范的一部分,可能不适用于您的编译器(参考:https://stackoverflow.com/questions/3523145/pointer-arithmetic-for-void-pointer-in-c/46238658#46238658)我未来可能会写一篇对于可移植性的文章,但我当初把我的空指针计算留在这里。
好了。迈向全面可操作的 btree 的实现又近又一步。下一步应该是拆分外部节点。(实际上后续作者迟迟没有持续 Orz)
有趣味能够参考原文:https://cstack.github.io/db_tutorial/
Enjoy GreatSQL :)
## 对于 GreatSQL
GreatSQL 是由万里数据库保护的 MySQL 分支,专一于晋升 MGR 可靠性及性能,反对 InnoDB 并行查问个性,是实用于金融级利用的 MySQL 分支版本。
相干链接:GreatSQL 社区 Gitee GitHub Bilibili
GreatSQL 社区:
社区博客有奖征稿详情:https://greatsql.cn/thread-100-1-1.html
技术交换群:
微信:扫码增加
GreatSQL 社区助手
微信好友,发送验证信息加群
。