GreatSQL社区

搜索

GreatSQL社区

实现一个简单Database13

GreatSQL社区 已有 1070 次阅读2023-5-24 18:45 |个人分类:技术分享|系统分类:其他


前文回顾


译注:cstack在github维护了一个简单的、类似sqlite的数据库实现,通过这个简单的项目,可以很好的理解数据库是如何运行的。本文是第十三篇,主要是在节点分裂后更新父节点

P**art 13 分裂后更新父节点**

对于我们史诗般的 B 树实现之旅的下一步,我们将处理,在叶子节点分裂后修复父节点(也就是更新父结点中的条目信息)。我来用下面的例子作为参考:

图片

Example of updating internal node

在这例子中,增加一个 Key "3" 到树中。这会引起左侧叶子节点分裂(假设节点中可存放两个条目,超过三个将引起分裂)。在分裂后做一下几个步骤来修复树(的结构):
1、用左侧叶子节点(left child)中最大 Key (“3”) 来更新父节点中的第一个Key。
2、在更新 Key 后添加一个新的指针和一个键对儿:

  • 新指针指向新的孩子节点(也是叶子节点)
  • 新增加的键对儿是新的孩子节点中的最大的 Key (“5”)

所以,首先得得首先,使用两个新函数替换之前的埋桩代码:
update_internal_node_key() for step 1 and internal_node_insert() for step 2

@@ -670,9 +725,11 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
   */

   void* old_node = get_page(cursor->table->pager, cursor->page_num);
+  uint32_t old_max = get_node_max_key(old_node);
   uint32_t new_page_num = get_unused_page_num(cursor->table->pager);
   void* new_node = get_page(cursor->table->pager, new_page_num);
   initialize_leaf_node(new_node);
+  *node_parent(new_node) = *node_parent(old_node);
   *leaf_node_next_leaf(new_node) = *leaf_node_next_leaf(old_node);
   *leaf_node_next_leaf(old_node) = new_page_num;

@@ -709,8 +766,12 @@ void leaf_node_split_and_insert(Cursor* cursor, uint32_t key, Row* value) {
   if (is_node_root(old_node)) {
     return create_new_root(cursor->table, new_page_num);
   } else {
-    printf("Need to implement updating parent after split\n");
-    exit(EXIT_FAILURE);
+    uint32_t parent_page_num = *node_parent(old_node);
+    uint32_t new_max = get_node_max_key(old_node);
+    void* parent = get_page(cursor->table->pager, parent_page_num);
+
+    update_internal_node_key(parent, old_max, new_max);
+    internal_node_insert(cursor->table, parent_page_num, new_page_num);
+    return;
   }
 }

为了获取对父节点的引用,我们需要开始在每个节点中记录指向其父节点的指针。

+uint32_t* node_parent(void* node) { return node + PARENT_POINTER_OFFSET; }
@@ -660,6 +675,48 @@ void create_new_root(Table* table, uint32_t right_child_page_num) {
   uint32_t left_child_max_key = get_node_max_key(left_child);
   *internal_node_key(root, 0) = left_child_max_key;
   *internal_node_right_child(root) = right_child_page_num;
+  *node_parent(left_child) = table->root_page_num;
+  *node_parent(right_child) = table->root_page_num;
 }

现在,需要找到在父节点中被影响到的单元格。子节点不知道它拥有的 page number ,所以我们不能不能查找到。但是子节点知道自己存有的最大的 Key ,所以我们能在父节点中找到这个 Key 。

+void update_internal_node_key(void* node, uint32_t old_key, uint32_t new_key) {
+  uint32_t old_child_index = internal_node_find_child(node, old_key);
+  *internal_node_key(node, old_child_index) = new_key;
 }

在internal_node_find_child()函数内部,我们重用了一些我们为了在内部节点查找一个 Key 代码。重构internal_node_find()函数使用新的 helper 方法。

-Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {
-  void* node = get_page(table->pager, page_num);
+uint32_t internal_node_find_child(void* node, uint32_t key) {
+  /*
+  Return the index of the child which should contain
+  the given key.
+  */
+
   uint32_t num_keys = *internal_node_num_keys(node);

-  /* Binary search to find index of child to search */
+  /* Binary search */
   uint32_t min_index = 0;
   uint32_t max_index = num_keys; /* there is one more child than key */

@@ -386,7 +394,14 @@ Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {
     }
   }

-  uint32_t child_num = *internal_node_child(node, min_index);
+  return min_index;
+}
+
+Cursor* internal_node_find(Table* table, uint32_t page_num, uint32_t key) {
+  void* node = get_page(table->pager, page_num);
+
+  uint32_t child_index = internal_node_find_child(node, key);
+  uint32_t child_num = *internal_node_child(node, child_index);
   void* child = get_page(table->pager, child_num);
   switch (get_node_type(child)) {
     case NODE_LEAF:

现在我们进入了本文的核心议题,实现 internal_node_insert() 函数。我将分批来解释(这个函数)。

+void internal_node_insert(Table* table, uint32_t parent_page_num,
+                          uint32_t child_page_num) {
+  /*
+  Add a new child/key pair to parent that corresponds to child
+  */
+
+  void* parent = get_page(table->pager, parent_page_num);
+  void* child = get_page(table->pager, child_page_num);
+  uint32_t child_max_key = get_node_max_key(child);
+  uint32_t index = internal_node_find_child(parent, child_max_key);
+
+  uint32_t original_num_keys = *internal_node_num_keys(parent);
+  *internal_node_num_keys(parent) = original_num_keys + 1;
+
+  if (original_num_keys >= INTERNAL_NODE_MAX_CELLS) {
+    printf("Need to implement splitting internal node\n");
+    exit(EXIT_FAILURE);
+  }

索引需要插入的新单元格(子节点指针或者键值对儿)位置是根据新产生的子节点中的最大的 Key 决定的。在例子中我们看到,child_max_key 值为 5 并且索引值为 1 。
如果在内部节点没有空间来存放一个单元格了,那么先抛出一个错误。这个(内部节点分裂)稍后再实现它。

现在来看一下函数的:

+
+  uint32_t right_child_page_num = *internal_node_right_child(parent);
+  void* right_child = get_page(table->pager, right_child_page_num);
+
+  if (child_max_key > get_node_max_key(right_child)) {
+    /* Replace right child */
+    *internal_node_child(parent, original_num_keys) = right_child_page_num;
+    *internal_node_key(parent, original_num_keys) =
+        get_node_max_key(right_child);
+    *internal_node_right_child(parent) = child_page_num;
+  } else {
+    /* Make room for the new cell */
+    for (uint32_t i = original_num_keys; i > index; i--) {
+      void* destination = internal_node_cell(parent, i);
+      void* source = internal_node_cell(parent, i - 1);
+      memcpy(destination, source, INTERNAL_NODE_CELL_SIZE);
+    }
+    *internal_node_child(parent, index) = child_page_num;
+    *internal_node_key(parent, index) = child_max_key;
+  }
+}

因为我们存放最右子节点指针和其他子节点指针/键值对儿是分开存储的,如果新的子节点变成了最右子节点,们就必须以不同的方式处理了。
在我们的代码例子中,会进入 else 模块。首先,将其他单元格向右移动一个空格,为新单元格腾出空间。(尽管在我们例子中有0个单元格移动)
接下来,将新的子节点指针和键值写入由索引确定的单元格中。
为了减少测试case的大小,我现在对INTERNAL_NODE_MAX_CELLS进行硬编码。

@@ -126,6 +126,8 @@ const uint32_t INTERNAL_NODE_KEY_SIZE = sizeof(uint32_t);
 const uint32_t INTERNAL_NODE_CHILD_SIZE = sizeof(uint32_t);
 const uint32_t INTERNAL_NODE_CELL_SIZE =
     INTERNAL_NODE_CHILD_SIZE + INTERNAL_NODE_KEY_SIZE;
+/* Keep this small for testing */
+const uint32_t INTERNAL_NODE_MAX_CELLS = 3;

说到测试,我们的大数据集测试通过旧的存根并进入新的存根:

@@ -65,7 +65,7 @@ describe 'database' do
     result = run_script(script)
     expect(result.last(2)).to match_array([
       "db > Executed.",
-      "db > Need to implement updating parent after split",
+      "db > Need to implement splitting internal node",
     ])

非常令人满意,我知道。
我添加另一个测试来打印四节点tree。正好我们测例更多时测试顺序id,这个测试会以伪随机顺序添加记录。

+  it 'allows printing out the structure of a 4-leaf-node btree' do
+    script = [
+      "insert 18 user18 person18@example.com",
+      "insert 7 user7 person7@example.com",
+      "insert 10 user10 person10@example.com",
+      "insert 29 user29 person29@example.com",
+      "insert 23 user23 person23@example.com",
+      "insert 4 user4 person4@example.com",
+      "insert 14 user14 person14@example.com",
+      "insert 30 user30 person30@example.com",
+      "insert 15 user15 person15@example.com",
+      "insert 26 user26 person26@example.com",
+      "insert 22 user22 person22@example.com",
+      "insert 19 user19 person19@example.com",
+      "insert 2 user2 person2@example.com",
+      "insert 1 user1 person1@example.com",
+      "insert 21 user21 person21@example.com",
+      "insert 11 user11 person11@example.com",
+      "insert 6 user6 person6@example.com",
+      "insert 20 user20 person20@example.com",
+      "insert 5 user5 person5@example.com",
+      "insert 8 user8 person8@example.com",
+      "insert 9 user9 person9@example.com",
+      "insert 3 user3 person3@example.com",
+      "insert 12 user12 person12@example.com",
+      "insert 27 user27 person27@example.com",
+      "insert 17 user17 person17@example.com",
+      "insert 16 user16 person16@example.com",
+      "insert 13 user13 person13@example.com",
+      "insert 24 user24 person24@example.com",
+      "insert 25 user25 person25@example.com",
+      "insert 28 user28 person28@example.com",
+      ".btree",
+      ".exit",
+    ]
+    result = run_script(script)

原样是输出如下的样子:

- internal (size 3)
  - leaf (size 7)
    - 1
    - 2
    - 3
    - 4
    - 5
    - 6
    - 7
  - key 1
  - leaf (size 8)
    - 8
    - 9
    - 10
    - 11
    - 12
    - 13
    - 14
    - 15
  - key 15
  - leaf (size 7)
    - 16
    - 17
    - 18
    - 19
    - 20
    - 21
    - 22
  - key 22
  - leaf (size 8)
    - 23
    - 24
    - 25
    - 26
    - 27
    - 28
    - 29
    - 30
db >

仔细观察,你会发现一个bug:

- 5
- 6
- 7
- key 1

key在这个位置应该输出7,不是1!
经过一堆的调试,我发现这是由于糟糕的指针算法导致的。

uint32_t* internal_node_key(void* node, uint32_t key_num) {
-  return internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;
+  return (void*)internal_node_cell(node, key_num) + INTERNAL_NODE_CHILD_SIZE;
}

INTERNAL_NODE_CHILD_SIZE值是4。在这里我的想法是想给函数internal_node_cell()的结果加4个字节,但是从函数internal_node_cell()的返回值返回的uint32_t指针,它实际上是加了 4 * sizeof(uint32_t) 个字节。我修改了这里,在计算之前,通过转换一个 void* 来解决了这个问题。
注意!空指针上的指针计算不是 C 标准的一部分,可能不适用于您的编译器(参考:https://stackoverflow.com/questions/3523145/pointer-arithmetic-for-void-pointer-in-c/46238658#46238658)我将来可能会写一篇关于可移植性的文章,但我现在把我的空指针计算留在这里。
好了。迈向全面可操作的 btree 的实现又近又一步。下一步应该是拆分内部节点。(实际上后续作者迟迟没有继续 Orz)
有兴趣可以参考原文:https://cstack.github.io/db_tutorial/


评论 (0 个评论)

facelist

您需要登录后才可以评论 登录 | 立即注册

合作电话:010-64087828

社区邮箱:greatsql@greatdb.com

社区公众号
社区小助手
QQ群
GMT+8, 2024-4-25 15:52 , Processed in 0.016124 second(s), 12 queries , Redis On.
返回顶部