前文回顾

  • 实现一个简略的Database系列

译注:cstack在github保护了一个简略的、相似sqlite的数据库实现,通过这个简略的我的项目,能够很好的了解数据库是如何运行的。本文是第八篇,次要是对B-tree的叶子节点格局的实现

Part 8 B-Tree叶子节点格局

咱们筹备把表的格局从非排序的数组格局行(rows)改成B-Tree。这是一个相当大变动,须要多个篇幅能力实现。在本文完结时,咱们将定义叶子节点的布局,反对插入键值对儿到单节点的B-Tree。然而首先,来回顾一下把数据结构(从数组array)切换到B-Tree的起因。

替换表格局

依据当初的格局(数组组织的行数据格式),每个page存储的只有行数据(没有元数据),所以空间应用上很有效率,十分节俭空间。插入操作也很快,因为它只是追加到表的结尾而已。然而,查问一个特定的行数据只能遍历整张表的数据能力搞定。并且如果想删除一行数据的话,咱们就只能挪动这条数据前面的所有数据能力填补因为删除数据产生的空洞。

如果把表数据依照数组的形式存储,然而放弃行按 id 排序,咱们就能够应用二分查找形式来查找一个特定的 id 。然而,插入操作会变得很慢,因为咱们不得不挪动大量行(rows)能力腾出空间插入数据。

相同,咱们采纳树的构造。每个在树中的节点可能蕴含的行数量可变,所以咱们不得不在节点中存储一些额定信息来追踪节点中蕴含了多少行数据。此外,所有外部节点还有存储开销,因为这些节点不存储任何行数据(只存储目录项用来做路由查找)。作为对保护一整个大文件计划的替换,(应用树的构造来组织行数据)咱们能够疾速执行插入、删除和查找。

非排序数组行数据排序数组行数据Tree of nodes
页内蕴含只有数据只有数据元数据, 主键, 数据
每页行数moremorefewer
插入O(1)O(n)O(log(n))
删除O(n)O(n)O(log(n))
id查找O(n)O(log(n))O(log(n))

节拍板的格局(Node Header Format)

叶子节点和外部节点有不同的布局格局。这里创立一个enum类型用来追踪节点的类型(node type):

+typedef enum { NODE_INTERNAL, NODE_LEAF } NodeType;

每个节点对应一个page。外部节点通过存储子节点的 page numbrer 来指向其子节点。btree 向 pager 申请特定的 page number ,而后将获取到的指针放到 page cache。数据页(page)是依照 page number 的程序一个接一个存储在数据库文件中的。

节点须要在它的头部(header,page的结尾地位)寄存一些元数据。每个节点存储它本人的节点类型,也就是它是不是根节点,还有一个指向父节点的指针(这个指针用来通过父节点来查找同一层级的其余节点,即兄弟节点)。我为每个节点的头部(node header)的字段(field)的大小和偏移地位定义了一些常量:

+/*+ * Common Node Header Layout+ */+const uint32_t NODE_TYPE_SIZE = sizeof(uint8_t);+const uint32_t NODE_TYPE_OFFSET = 0;+const uint32_t IS_ROOT_SIZE = sizeof(uint8_t);+const uint32_t IS_ROOT_OFFSET = NODE_TYPE_SIZE;+const uint32_t PARENT_POINTER_SIZE = sizeof(uint32_t);+const uint32_t PARENT_POINTER_OFFSET = IS_ROOT_OFFSET + IS_ROOT_SIZE;+const uint8_t COMMON_NODE_HEADER_SIZE =+    NODE_TYPE_SIZE + IS_ROOT_SIZE + PARENT_POINTER_SIZE;

叶子节点格局(Leaf Node Format)

除了这些常见头部字段(header fields),叶子节点须要存储它蕴含有多少单元格(cells)。每个单元格里寄存的是一个键值对儿。

+/*+ * Leaf Node Header Layout+ */+const uint32_t LEAF_NODE_NUM_CELLS_SIZE = sizeof(uint32_t);+const uint32_t LEAF_NODE_NUM_CELLS_OFFSET = COMMON_NODE_HEADER_SIZE;+const uint32_t LEAF_NODE_HEADER_SIZE =+    COMMON_NODE_HEADER_SIZE + LEAF_NODE_NUM_CELLS_SIZE;

叶子节点主体就是一个单元格的数组(一个个键值对儿排列为数组),每个单元格是一个key追随一个value(一个序列化的行)。

+/*+ * Leaf Node Body Layout+ */+const uint32_t LEAF_NODE_KEY_SIZE = sizeof(uint32_t);+const uint32_t LEAF_NODE_KEY_OFFSET = 0;+const uint32_t LEAF_NODE_VALUE_SIZE = ROW_SIZE;+const uint32_t LEAF_NODE_VALUE_OFFSET =+    LEAF_NODE_KEY_OFFSET + LEAF_NODE_KEY_SIZE;+const uint32_t LEAF_NODE_CELL_SIZE = LEAF_NODE_KEY_SIZE + LEAF_NODE_VALUE_SIZE;+const uint32_t LEAF_NODE_SPACE_FOR_CELLS = PAGE_SIZE - LEAF_NODE_HEADER_SIZE;+const uint32_t LEAF_NODE_MAX_CELLS =+    LEAF_NODE_SPACE_FOR_CELLS / LEAF_NODE_CELL_SIZE;

依据这些常量,上面是叶子节点布局格局看起来的样子:

咱们叶子节点的格局

在节点头部应用一整个字节来示意 boolean 类型值有点低效,然而这会让编写代码来拜访这些值变得不便。

也须要留神到在结尾处有一些空间节约。咱们在节点头部之后尽量多的来存储单元格,然而剩下的空间可能不足以寄存一个整个的单元格。这种状况咱们将其留空来防止在节点之间拆分单元格(一个单元格不能跨两个节点)。

拜访叶子节点(Accessing Leaf Node Fields)

拜访keys、values和元数据都波及到咱们刚刚定义的常量的指针算法。

+uint32_t* leaf_node_num_cells(void* node) {+  return node + LEAF_NODE_NUM_CELLS_OFFSET;+}++void* leaf_node_cell(void* node, uint32_t cell_num) {+  return node + LEAF_NODE_HEADER_SIZE + cell_num * LEAF_NODE_CELL_SIZE;+}++uint32_t* leaf_node_key(void* node, uint32_t cell_num) {+  return leaf_node_cell(node, cell_num);+}++void* leaf_node_value(void* node, uint32_t cell_num) {+  return leaf_node_cell(node, cell_num) + LEAF_NODE_KEY_SIZE;+}++void initialize_leaf_node(void* node) { *leaf_node_num_cells(node) = 0; }+

这些办法返回指向相干值的指针,因而它们既能够用作 getter,也能够用作 setter。

批改 Pager 和 Table Objects

即便页不是满的,每个节点也正好占用一个页的空间。这意味着咱们的 pager 不再须要读/写页的一部分(而是整存整取一整个页)。

-void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {+void pager_flush(Pager* pager, uint32_t page_num) {   if (pager->pages[page_num] == NULL) {     printf("Tried to flush null page\n");     exit(EXIT_FAILURE);@@ -242,7 +337,7 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {   }   ssize_t bytes_written =-      write(pager->file_descriptor, pager->pages[page_num], size);+      write(pager->file_descriptor, pager->pages[page_num], PAGE_SIZE);   if (bytes_written == -1) {     printf("Error writing: %d\n", errno);void db_close(Table* table) {  Pager* pager = table->pager;-  uint32_t num_full_pages = table->num_rows / ROWS_PER_PAGE;-  for (uint32_t i = 0; i < num_full_pages; i++) {+  for (uint32_t i = 0; i < pager->num_pages; i++) {    if (pager->pages[i] == NULL) {      continue;    }-    pager_flush(pager, i, PAGE_SIZE);+    pager_flush(pager, i);    free(pager->pages[i]);    pager->pages[i] = NULL;  }-  // There may be a partial page to write to the end of the file-  // This should not be needed after we switch to a B-tree-  uint32_t num_additional_rows = table->num_rows % ROWS_PER_PAGE;-  if (num_additional_rows > 0) {-    uint32_t page_num = num_full_pages;-    if (pager->pages[page_num] != NULL) {-      pager_flush(pager, page_num, num_additional_rows * ROW_SIZE);-      free(pager->pages[page_num]);-      pager->pages[page_num] = NULL;-    }-  }-  int result = close(pager->file_descriptor);  if (result == -1) {    printf("Error closing db file.\n");

当初,在数据库中存储 page number 比存储行数更有意义。page number 须要与 pager 对象相关联,而不是与表关联,因为这是数据库用到的页的数量,而不是特定的表用到的。btree 是通过根节点的 page number 来辨认的,所以表对象须要跟踪 page number。

const uint32_t PAGE_SIZE = 4096;const uint32_t TABLE_MAX_PAGES = 100;-const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;-const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES;typedef struct {  int file_descriptor;  uint32_t file_length;+  uint32_t num_pages;  void* pages[TABLE_MAX_PAGES];} Pager;typedef struct {  Pager* pager;-  uint32_t num_rows;+  uint32_t root_page_num;} Table;@@ -127,6 +200,10 @@ void* get_page(Pager* pager, uint32_t page_num) {     }     pager->pages[page_num] = page;++    if (page_num >= pager->num_pages) {+      pager->num_pages = page_num + 1;+    }   }   return pager->pages[page_num];@@ -184,6 +269,12 @@ Pager* pager_open(const char* filename) {   Pager* pager = malloc(sizeof(Pager));   pager->file_descriptor = fd;   pager->file_length = file_length;+  pager->num_pages = (file_length / PAGE_SIZE);++  if (file_length % PAGE_SIZE != 0) {+    printf("Db file is not a whole number of pages. Corrupt file.\n");+    exit(EXIT_FAILURE);+  }   for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {     pager->pages[i] = NULL;

批改为Cursor Object

游标示意表中的一个地位。当咱们的表是一个简略的行数组时(array of rows),咱们应用给出的行号就能够拜访行数据。当初,表变成了一棵 btree,咱们通过节点的 page number 和节点内的单元格号(cell number)来辨认一个地位。

typedef struct {  Table* table;-  uint32_t row_num;+  uint32_t page_num;+  uint32_t cell_num;  bool end_of_table;  // Indicates a position one past the last element} Cursor;Cursor* table_start(Table* table) {  Cursor* cursor = malloc(sizeof(Cursor));  cursor->table = table;-  cursor->row_num = 0;-  cursor->end_of_table = (table->num_rows == 0);+  cursor->page_num = table->root_page_num;+  cursor->cell_num = 0;++  void* root_node = get_page(table->pager, table->root_page_num);+  uint32_t num_cells = *leaf_node_num_cells(root_node);+  cursor->end_of_table = (num_cells == 0);  return cursor;}Cursor* table_end(Table* table) {  Cursor* cursor = malloc(sizeof(Cursor));  cursor->table = table;-  cursor->row_num = table->num_rows;+  cursor->page_num = table->root_page_num;++  void* root_node = get_page(table->pager, table->root_page_num);+  uint32_t num_cells = *leaf_node_num_cells(root_node);+  cursor->cell_num = num_cells;  cursor->end_of_table = true;  return cursor;}void* cursor_value(Cursor* cursor) {-  uint32_t row_num = cursor->row_num;-  uint32_t page_num = row_num / ROWS_PER_PAGE;+  uint32_t page_num = cursor->page_num;  void* page = get_page(cursor->table->pager, page_num);-  uint32_t row_offset = row_num % ROWS_PER_PAGE;-  uint32_t byte_offset = row_offset * ROW_SIZE;-  return page + byte_offset;+  return leaf_node_value(page, cursor->cell_num);}void cursor_advance(Cursor* cursor) {-  cursor->row_num += 1;-  if (cursor->row_num >= cursor->table->num_rows) {+  uint32_t page_num = cursor->page_num;+  void* node = get_page(cursor->table->pager, page_num);++  cursor->cell_num += 1;+  if (cursor->cell_num >= (*leaf_node_num_cells(node))) {    cursor->end_of_table = true;  }}

叶子节点(Leaf Node)的插入操作

在本文中,咱们只是残缺的实现获取一个单节点的 btree。回忆一下上一篇文章内容,一棵 btree 是从一个空的叶子节点开始的:

空btree

键值对儿能够被插入到叶子节点中,直到叶子节点变满达到存储限度。

一个节点的btree

当咱们第一次关上数据库时(首次启动时),数据库文件是空的,没有任何用户数据,所以咱们初始化 page 0 为一个空的叶子节点(同时也是根节点)。

Table* db_open(const char* filename) {  Pager* pager = pager_open(filename);-  uint32_t num_rows = pager->file_length / ROW_SIZE;  Table* table = malloc(sizeof(Table));  table->pager = pager;-  table->num_rows = num_rows;+  table->root_page_num = 0;++  if (pager->num_pages == 0) {+    // New database file. Initialize page 0 as leaf node.+    void* root_node = get_page(pager, 0);+    initialize_leaf_node(root_node);+  }  return table;}

接下来咱们定义一个函数用来插入一个键值对儿到一个叶子节点中。它将采纳游标来作为须要插入键值对儿的地位的示意。

+void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value) {+  void* node = get_page(cursor->table->pager, cursor->page_num);++  uint32_t num_cells = *leaf_node_num_cells(node);+  if (num_cells >= LEAF_NODE_MAX_CELLS) {+    // Node full+    printf("Need to implement splitting a leaf node.\n");+    exit(EXIT_FAILURE);+  }++  if (cursor->cell_num < num_cells) {+    // Make room for new cell+    for (uint32_t i = num_cells; i > cursor->cell_num; i--) {+      memcpy(leaf_node_cell(node, i), leaf_node_cell(node, i - 1),+             LEAF_NODE_CELL_SIZE);+    }+  }++  *(leaf_node_num_cells(node)) += 1;+  *(leaf_node_key(node, cursor->cell_num)) = key;+  serialize_row(value, leaf_node_value(node, cursor->cell_num));+}+

咱们还没有实现节点的决裂,所以临时在节点变满的时候先报错解决。接下来咱们把单元格(cells)向右挪动一个空间(one space)来给新的单元格让出地位。而后咱们在空地位写入键值对儿。

因为目前咱们是假如 btree 只有一个节点,咱们的 execute_insert() 函数只须要调用这个helper办法(调用leaf_node_insert()插入数据):

ExecuteResult execute_insert(Statement* statement, Table* table) {-  if (table->num_rows >= TABLE_MAX_ROWS) {+  void* node = get_page(table->pager, table->root_page_num);+  if ((*leaf_node_num_cells(node) >= LEAF_NODE_MAX_CELLS)) {    return EXECUTE_TABLE_FULL;  }  Row* row_to_insert = &(statement->row_to_insert);  Cursor* cursor = table_end(table);-  serialize_row(row_to_insert, cursor_value(cursor));-  table->num_rows += 1;+  leaf_node_insert(cursor, row_to_insert->id, row_to_insert);  free(cursor);

做了这些批改后,咱们的数据库就能像以前一样运行了。除了当初它会很快地返回“Table Full”的谬误,因为咱们当初还不能拆分根节点。

叶子节点能够存储多少行呢?

打印常量的命令

我减少了一个新的 meta command,来打印一些插入操作时的变量。

+void print_constants() {+  printf("ROW_SIZE: %d\n", ROW_SIZE);+  printf("COMMON_NODE_HEADER_SIZE: %d\n", COMMON_NODE_HEADER_SIZE);+  printf("LEAF_NODE_HEADER_SIZE: %d\n", LEAF_NODE_HEADER_SIZE);+  printf("LEAF_NODE_CELL_SIZE: %d\n", LEAF_NODE_CELL_SIZE);+  printf("LEAF_NODE_SPACE_FOR_CELLS: %d\n", LEAF_NODE_SPACE_FOR_CELLS);+  printf("LEAF_NODE_MAX_CELLS: %d\n", LEAF_NODE_MAX_CELLS);+}+@@ -294,6 +376,14 @@ MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {   if (strcmp(input_buffer->buffer, ".exit") == 0) {     db_close(table);     exit(EXIT_SUCCESS);+  } else if (strcmp(input_buffer->buffer, ".constants") == 0) {+    printf("Constants:\n");+    print_constants();+    return META_COMMAND_SUCCESS;   } else {     return META_COMMAND_UNRECOGNIZED_COMMAND;   }

我还加了一些测试,这样咱们就能在变量发生变化时获取报警信息。

+  it 'prints constants' do+    script = [+      ".constants",+      ".exit",+    ]+    result = run_script(script)++    expect(result).to match_array([+      "db > Constants:",+      "ROW_SIZE: 293",+      "COMMON_NODE_HEADER_SIZE: 6",+      "LEAF_NODE_HEADER_SIZE: 10",+      "LEAF_NODE_CELL_SIZE: 297",+      "LEAF_NODE_SPACE_FOR_CELLS: 4086",+      "LEAF_NODE_MAX_CELLS: 13",+      "db > ",+    ])+  end

通过测试,能够看出咱们的 table 当初能够存储 13 行数据。

Tree 可视化

为了有助于 debug 和可视化,我又减少了一个 meta command 来打印 btree 的示意模式。

+void print_leaf_node(void* node) {+  uint32_t num_cells = *leaf_node_num_cells(node);+  printf("leaf (size %d)\n", num_cells);+  for (uint32_t i = 0; i < num_cells; i++) {+    uint32_t key = *leaf_node_key(node, i);+    printf("  - %d : %d\n", i, key);+  }+}+@@ -294,6 +376,14 @@ MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {   if (strcmp(input_buffer->buffer, ".exit") == 0) {     db_close(table);     exit(EXIT_SUCCESS);+  } else if (strcmp(input_buffer->buffer, ".btree") == 0) {+    printf("Tree:\n");+    print_leaf_node(get_page(table->pager, 0));+    return META_COMMAND_SUCCESS;   } else if (strcmp(input_buffer->buffer, ".constants") == 0) {     printf("Constants:\n");     print_constants();     return META_COMMAND_SUCCESS;   } else {     return META_COMMAND_UNRECOGNIZED_COMMAND;   }

而后进行测试:

+  it 'allows printing out the structure of a one-node btree' do+    script = [3, 1, 2].map do |i|+      "insert #{i} user#{i} person#{i}@example.com"+    end+    script << ".btree"+    script << ".exit"+    result = run_script(script)++    expect(result).to match_array([+      "db > Executed.",+      "db > Executed.",+      "db > Executed.",+      "db > Tree:",+      "leaf (size 3)",+      "  - 0 : 3",+      "  - 1 : 1",+      "  - 2 : 2",+      "db > "+    ])+  end

啊哦,咱们还是没有依照排序的程序来存储行。你会留神到 execute_insert() 函数在叶子节点中插入时的地位就是 table_end() 函数返回的地位。所以数据是依照咱们插入的顺序存储的,这是跟以前一样的。

后续

这些(扭转)看起来像是倒退了一步。咱们的数据库当初存储的行数据量要比以前要少很多,并且咱们依然存储的是无序的行数据。然而就像我在开始说的,这是一个微小的变动,重要的是要将这些扭转合成为方便管理的几步来缓缓实现。

下一次,咱们将实现通过主键来查找一行记录的性能,并且开始让行数据存储变得有序。

代码变更比拟

@@ -62,29 +62,101 @@ const uint32_t ROW_SIZE = ID_SIZE + USERNAME_SIZE + EMAIL_SIZE; const uint32_t PAGE_SIZE = 4096; #define TABLE_MAX_PAGES 100-const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;-const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES; typedef struct {   int file_descriptor;   uint32_t file_length;+  uint32_t num_pages;   void* pages[TABLE_MAX_PAGES]; } Pager; typedef struct {   Pager* pager;-  uint32_t num_rows;+  uint32_t root_page_num; } Table; typedef struct {   Table* table;-  uint32_t row_num;+  uint32_t page_num;+  uint32_t cell_num;   bool end_of_table;  // Indicates a position one past the last element } Cursor;+typedef enum { NODE_INTERNAL, NODE_LEAF } NodeType;++/*+ * Common Node Header Layout+ */+const uint32_t NODE_TYPE_SIZE = sizeof(uint8_t);+const uint32_t NODE_TYPE_OFFSET = 0;+const uint32_t IS_ROOT_SIZE = sizeof(uint8_t);+const uint32_t IS_ROOT_OFFSET = NODE_TYPE_SIZE;+const uint32_t PARENT_POINTER_SIZE = sizeof(uint32_t);+const uint32_t PARENT_POINTER_OFFSET = IS_ROOT_OFFSET + IS_ROOT_SIZE;+const uint8_t COMMON_NODE_HEADER_SIZE =+    NODE_TYPE_SIZE + IS_ROOT_SIZE + PARENT_POINTER_SIZE;++/*+ * Leaf Node Header Layout+ */+const uint32_t LEAF_NODE_NUM_CELLS_SIZE = sizeof(uint32_t);+const uint32_t LEAF_NODE_NUM_CELLS_OFFSET = COMMON_NODE_HEADER_SIZE;+const uint32_t LEAF_NODE_HEADER_SIZE =+    COMMON_NODE_HEADER_SIZE + LEAF_NODE_NUM_CELLS_SIZE;++/*+ * Leaf Node Body Layout+ */+const uint32_t LEAF_NODE_KEY_SIZE = sizeof(uint32_t);+const uint32_t LEAF_NODE_KEY_OFFSET = 0;+const uint32_t LEAF_NODE_VALUE_SIZE = ROW_SIZE;+const uint32_t LEAF_NODE_VALUE_OFFSET =+    LEAF_NODE_KEY_OFFSET + LEAF_NODE_KEY_SIZE;+const uint32_t LEAF_NODE_CELL_SIZE = LEAF_NODE_KEY_SIZE + LEAF_NODE_VALUE_SIZE;+const uint32_t LEAF_NODE_SPACE_FOR_CELLS = PAGE_SIZE - LEAF_NODE_HEADER_SIZE;+const uint32_t LEAF_NODE_MAX_CELLS =+    LEAF_NODE_SPACE_FOR_CELLS / LEAF_NODE_CELL_SIZE;++uint32_t* leaf_node_num_cells(void* node) {+  return node + LEAF_NODE_NUM_CELLS_OFFSET;+}++void* leaf_node_cell(void* node, uint32_t cell_num) {+  return node + LEAF_NODE_HEADER_SIZE + cell_num * LEAF_NODE_CELL_SIZE;+}++uint32_t* leaf_node_key(void* node, uint32_t cell_num) {+  return leaf_node_cell(node, cell_num);+}++void* leaf_node_value(void* node, uint32_t cell_num) {+  return leaf_node_cell(node, cell_num) + LEAF_NODE_KEY_SIZE;+}++void print_constants() {+  printf("ROW_SIZE: %d\n", ROW_SIZE);+  printf("COMMON_NODE_HEADER_SIZE: %d\n", COMMON_NODE_HEADER_SIZE);+  printf("LEAF_NODE_HEADER_SIZE: %d\n", LEAF_NODE_HEADER_SIZE);+  printf("LEAF_NODE_CELL_SIZE: %d\n", LEAF_NODE_CELL_SIZE);+  printf("LEAF_NODE_SPACE_FOR_CELLS: %d\n", LEAF_NODE_SPACE_FOR_CELLS);+  printf("LEAF_NODE_MAX_CELLS: %d\n", LEAF_NODE_MAX_CELLS);+}++void print_leaf_node(void* node) {+  uint32_t num_cells = *leaf_node_num_cells(node);+  printf("leaf (size %d)\n", num_cells);+  for (uint32_t i = 0; i < num_cells; i++) {+    uint32_t key = *leaf_node_key(node, i);+    printf("  - %d : %d\n", i, key);+  }+}+ void print_row(Row* row) {     printf("(%d, %s, %s)\n", row->id, row->username, row->email); }@@ -101,6 +173,8 @@ void deserialize_row(void *source, Row* destination) {     memcpy(&(destination->email), source + EMAIL_OFFSET, EMAIL_SIZE); }+void initialize_leaf_node(void* node) { *leaf_node_num_cells(node) = 0; }+ void* get_page(Pager* pager, uint32_t page_num) {   if (page_num > TABLE_MAX_PAGES) {     printf("Tried to fetch page number out of bounds. %d > %d\n", page_num,@@ -128,6 +202,10 @@ void* get_page(Pager* pager, uint32_t page_num) {     }     pager->pages[page_num] = page;++    if (page_num >= pager->num_pages) {+      pager->num_pages = page_num + 1;+    }   }   return pager->pages[page_num];@@ -136,8 +214,12 @@ void* get_page(Pager* pager, uint32_t page_num) { Cursor* table_start(Table* table) {   Cursor* cursor = malloc(sizeof(Cursor));   cursor->table = table;-  cursor->row_num = 0;-  cursor->end_of_table = (table->num_rows == 0);+  cursor->page_num = table->root_page_num;+  cursor->cell_num = 0;++  void* root_node = get_page(table->pager, table->root_page_num);+  uint32_t num_cells = *leaf_node_num_cells(root_node);+  cursor->end_of_table = (num_cells == 0);   return cursor; }@@ -145,24 +227,28 @@ Cursor* table_start(Table* table) { Cursor* table_end(Table* table) {   Cursor* cursor = malloc(sizeof(Cursor));   cursor->table = table;-  cursor->row_num = table->num_rows;+  cursor->page_num = table->root_page_num;++  void* root_node = get_page(table->pager, table->root_page_num);+  uint32_t num_cells = *leaf_node_num_cells(root_node);+  cursor->cell_num = num_cells;   cursor->end_of_table = true;   return cursor; } void* cursor_value(Cursor* cursor) {-  uint32_t row_num = cursor->row_num;-  uint32_t page_num = row_num / ROWS_PER_PAGE;+  uint32_t page_num = cursor->page_num;   void* page = get_page(cursor->table->pager, page_num);-  uint32_t row_offset = row_num % ROWS_PER_PAGE;-  uint32_t byte_offset = row_offset * ROW_SIZE;-  return page + byte_offset;+  return leaf_node_value(page, cursor->cell_num); } void cursor_advance(Cursor* cursor) {-  cursor->row_num += 1;-  if (cursor->row_num >= cursor->table->num_rows) {+  uint32_t page_num = cursor->page_num;+  void* node = get_page(cursor->table->pager, page_num);++  cursor->cell_num += 1;+  if (cursor->cell_num >= (*leaf_node_num_cells(node))) {     cursor->end_of_table = true;   } }@@ -185,6 +271,12 @@ Pager* pager_open(const char* filename) {   Pager* pager = malloc(sizeof(Pager));   pager->file_descriptor = fd;   pager->file_length = file_length;+  pager->num_pages = (file_length / PAGE_SIZE);++  if (file_length % PAGE_SIZE != 0) {+    printf("Db file is not a whole number of pages. Corrupt file.\n");+    exit(EXIT_FAILURE);+  }   for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {     pager->pages[i] = NULL;@@ -194,11 +285,15 @@ Pager* pager_open(const char* filename) {@@ -195,11 +287,16 @@ Pager* pager_open(const char* filename) { Table* db_open(const char* filename) {   Pager* pager = pager_open(filename);-  uint32_t num_rows = pager->file_length / ROW_SIZE;   Table* table = malloc(sizeof(Table));   table->pager = pager;-  table->num_rows = num_rows;+  table->root_page_num = 0;++  if (pager->num_pages == 0) {+    // New database file. Initialize page 0 as leaf node.+    void* root_node = get_page(pager, 0);+    initialize_leaf_node(root_node);+  }   return table; }@@ -234,7 +331,7 @@ void close_input_buffer(InputBuffer* input_buffer) {     free(input_buffer); }-void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {+void pager_flush(Pager* pager, uint32_t page_num) {   if (pager->pages[page_num] == NULL) {     printf("Tried to flush null page\n");     exit(EXIT_FAILURE);@@ -242,7 +337,7 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {@@ -249,7 +346,7 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {   }   ssize_t bytes_written =-      write(pager->file_descriptor, pager->pages[page_num], size);+      write(pager->file_descriptor, pager->pages[page_num], PAGE_SIZE);   if (bytes_written == -1) {     printf("Error writing: %d\n", errno);@@ -252,29 +347,16 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) {@@ -260,29 +357,16 @@ void pager_flush(Pager* pager, uint32_t page_num, uint32_t size) { void db_close(Table* table) {   Pager* pager = table->pager;-  uint32_t num_full_pages = table->num_rows / ROWS_PER_PAGE;-  for (uint32_t i = 0; i < num_full_pages; i++) {+  for (uint32_t i = 0; i < pager->num_pages; i++) {     if (pager->pages[i] == NULL) {       continue;     }-    pager_flush(pager, i, PAGE_SIZE);+    pager_flush(pager, i);     free(pager->pages[i]);     pager->pages[i] = NULL;   }-  // There may be a partial page to write to the end of the file-  // This should not be needed after we switch to a B-tree-  uint32_t num_additional_rows = table->num_rows % ROWS_PER_PAGE;-  if (num_additional_rows > 0) {-    uint32_t page_num = num_full_pages;-    if (pager->pages[page_num] != NULL) {-      pager_flush(pager, page_num, num_additional_rows * ROW_SIZE);-      free(pager->pages[page_num]);-      pager->pages[page_num] = NULL;-    }-  }-   int result = close(pager->file_descriptor);   if (result == -1) {     printf("Error closing db file.\n");@@ -305,6 +389,14 @@ MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table *table) {   if (strcmp(input_buffer->buffer, ".exit") == 0) {     db_close(table);     exit(EXIT_SUCCESS);+  } else if (strcmp(input_buffer->buffer, ".btree") == 0) {+    printf("Tree:\n");+    print_leaf_node(get_page(table->pager, 0));+    return META_COMMAND_SUCCESS;+  } else if (strcmp(input_buffer->buffer, ".constants") == 0) {+    printf("Constants:\n");+    print_constants();+    return META_COMMAND_SUCCESS;   } else {     return META_COMMAND_UNRECOGNIZED_COMMAND;   }@@ -354,16 +446,39 @@ PrepareResult prepare_statement(InputBuffer* input_buffer,   return PREPARE_UNRECOGNIZED_STATEMENT; }+void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value) {+  void* node = get_page(cursor->table->pager, cursor->page_num);++  uint32_t num_cells = *leaf_node_num_cells(node);+  if (num_cells >= LEAF_NODE_MAX_CELLS) {+    // Node full+    printf("Need to implement splitting a leaf node.\n");+    exit(EXIT_FAILURE);+  }++  if (cursor->cell_num < num_cells) {+    // Make room for new cell+    for (uint32_t i = num_cells; i > cursor->cell_num; i--) {+      memcpy(leaf_node_cell(node, i), leaf_node_cell(node, i - 1),+             LEAF_NODE_CELL_SIZE);+    }+  }++  *(leaf_node_num_cells(node)) += 1;+  *(leaf_node_key(node, cursor->cell_num)) = key;+  serialize_row(value, leaf_node_value(node, cursor->cell_num));+}+ ExecuteResult execute_insert(Statement* statement, Table* table) {-  if (table->num_rows >= TABLE_MAX_ROWS) {+  void* node = get_page(table->pager, table->root_page_num);+  if ((*leaf_node_num_cells(node) >= LEAF_NODE_MAX_CELLS)) {     return EXECUTE_TABLE_FULL;   }   Row* row_to_insert = &(statement->row_to_insert);   Cursor* cursor = table_end(table);-  serialize_row(row_to_insert, cursor_value(cursor));-  table->num_rows += 1;+  leaf_node_insert(cursor, row_to_insert->id, row_to_insert);   free(cursor);

上面是测试过程:

+  it 'allows printing out the structure of a one-node btree' do+    script = [3, 1, 2].map do |i|+      "insert #{i} user#{i} person#{i}@example.com"+    end+    script << ".btree"+    script << ".exit"+    result = run_script(script)++    expect(result).to match_array([+      "db > Executed.",+      "db > Executed.",+      "db > Executed.",+      "db > Tree:",+      "leaf (size 3)",+      "  - 0 : 3",+      "  - 1 : 1",+      "  - 2 : 2",+      "db > "+    ])+  end++  it 'prints constants' do+    script = [+      ".constants",+      ".exit",+    ]+    result = run_script(script)++    expect(result).to match_array([+      "db > Constants:",+      "ROW_SIZE: 293",+      "COMMON_NODE_HEADER_SIZE: 6",+      "LEAF_NODE_HEADER_SIZE: 10",+      "LEAF_NODE_CELL_SIZE: 297",+      "LEAF_NODE_SPACE_FOR_CELLS: 4086",+      "LEAF_NODE_MAX_CELLS: 13",+      "db > ",+    ])+  end end

Enjoy GreatSQL :)

## 对于 GreatSQL

GreatSQL是由万里数据库保护的MySQL分支,专一于晋升MGR可靠性及性能,反对InnoDB并行查问个性,是实用于金融级利用的MySQL分支版本。

相干链接: GreatSQL社区 Gitee GitHub Bilibili

GreatSQL社区:

社区博客有奖征稿详情:https://greatsql.cn/thread-10...

技术交换群:

微信:扫码增加GreatSQL社区助手微信好友,发送验证信息加群