MySQL 源码解读 -- PFS

pfs buffer container

很3个核心的类,

  1. template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT,
    class U = PFS_buffer_default_array,
    class V = PFS_buffer_default_allocator>
    class PFS_buffer_scalable_container
  2. template
    class PFS_buffer_default_allocator
  3. template
    class PFS_buffer_default_array

PFS_buffer_scalable_container 数据结构
是一个类似buffer pool的算法, 其中存在n 个page, 每个page 有m 个row
一个page 就是由PFS_buffer_default_array 来管理
一个page 申请内存时, 就是调用PFS_buffer_default_allocator 来申请内存
一个row 就一个T

PFS_buffer_scalable_container

類似的拓展類, PFS_buffer_container, PFS_partitioned_buffer_scalable_container

以mutex 为例, 进行解释

typedef PFS_buffer_scalable_container<PFS_mutex, 1024, 1024>
PFS_mutex_basic_container;
typedef PFS_partitioned_buffer_scalable_container<PFS_mutex_basic_container,
PFS_MUTEX_PARTITIONS>
PFS_mutex_container;
#define PFS_MUTEX_PARTITIONS 2

PFS_buffer_default_allocator default_mutex_allocator(
&builtin_memory_mutex);
PFS_mutex_container global_mutex_container(&default_mutex_allocator);

1
2
3
4
5
6
7
8
template <class B, int PFS_PARTITION_COUNT>
class PFS_partitioned_buffer_scalable_container {
// 设计非常简单,
B *m_partitions[PFS_PARTITION_COUNT];

所有操作都根据partition index 来找到对应的PFS_buffer_scalable_container 进行操作
如果需要求总值, 就将每个partition的对应值进行累加
}

PFS_buffer_scalable_container 数据结构
是一个类似buffer pool的算法, 其中存在n 个page, 每个page 1024 个row, 一个row就是一个B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT,
class U = PFS_buffer_default_array<T>,
class V = PFS_buffer_default_allocator<T>>
class PFS_buffer_scalable_container {
bool m_initialized;
bool m_full;
size_t m_max;
PFS_cacheline_atomic_size_t m_monotonic; // 当前申请row的那个page index
PFS_cacheline_atomic_size_t m_max_page_index; // 已经申请了多少page
size_t m_max_page_count; // 可以申请最大页数
size_t m_last_page_size; // 最后一页的page size
std::atomic<array_type *> m_pages[PFS_PAGE_COUNT];
allocator_type *m_allocator;
native_mutex_t m_critical_section;

/* dynamic shrink related */
/* now only PFS_thread_container allowed dynamic shrink */
bool m_dynamic_shrink;
/* each slot's page refcount */
std::atomic<size_t> m_page_refcounts[PFS_PAGE_COUNT];
/* each slot's page version snapshot */
size_t m_page_versions[PFS_PAGE_COUNT];
uint m_current_reclaim_begin;
uint m_current_reclaim_end;
public:
/* statistics of dynamic shrink */
std::atomic<size_t> page_spin_waits;
std::atomic<size_t> rc_spin_waits;
std::atomic<size_t> allocate_retry_count;
std::atomic<size_t> reclaim_fail_count;
std::atomic<size_t> reclaim_success_count;

int init(long max_size, bool dyn_shrink=false) {
// 只是把对应的设置成员变量设置好, 但内存还没有申请, m_pages[PFS_PAGE_COUNT] 指向都为空
// 调用方式类似这样, param->m_mutex_sizing 由配置参数performance_schema_max_mutex_classes进行设置
//if (global_mutex_container.init(param->m_mutex_sizing)) {
// return 1;
//}
}

value_type *allocate(pfs_dirty_state *dirty_state) {
// 第一步
// 根据m_monotonic 和m_max_page_index , 从已有的page中申请一个row 满足,
// pfs = safe_allocate_value(index, dirty_state);
// 如果可以申请,则返回, 否则 执行下一步

// 第二步
//array = load_page_no_pending(current_page_count);
// 其实就是m_pages[index].load();但如果对应的page 处在pending状态会等待
// 如果array 为空, 则array = allocate_one_array(current_page_count, true);
// 在allocate_one_array 函数中
// 如果设置了shrink, 则等待m_page_refcounts[index] 降为0
// 申请一个PFS_buffer_default_array, 并设置这个PFS_buffer_default_array
// 然后调用PFS_buffer_default_allocator>alloc_array(PFS_buffer_default_array) 申请rows 的内存
// 申请一个page size个row
//

// 第三步
调用safe_allocate_value , 如果失败,从第二步再试
}
}

PFS_buffer_default_array

这个类核心提供一个allocate 和deallocate 接口
类成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <class T>
class PFS_buffer_default_array {
bool m_full;
PFS_cacheline_atomic_size_t m_monotonic;
T *m_ptr;
size_t m_max;
/** Container. */
PFS_opaque_container *m_container;
/*
version number for dynamic reclaim, each time array is used,
version increase.
*/
PFS_cacheline_atomic_size_t m_version;
/*
indicating whether page is used, this is to avoid new
allocated page been freed without ever being used.
*/
std::atomic<bool> m_used{false};
/** page index */
uint m_page_index;
}

PFS_buffer_default_array

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <class T>
class PFS_buffer_default_array {
bool m_full;
PFS_cacheline_atomic_size_t m_monotonic;
T *m_ptr;
size_t m_max;
/** Container. */
PFS_opaque_container *m_container;
/*
version number for dynamic reclaim, each time array is used,
version increase.
*/
PFS_cacheline_atomic_size_t m_version;
/*
indicating whether page is used, this is to avoid new
allocated page been freed without ever being used.
*/
std::atomic<bool> m_used{false};
/** page index */
uint m_page_index;
}

唯一一个额外的地方,是alloc 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
value_type *allocate(pfs_dirty_state *dirty_state) {
根据monotonic, 拿到对应位置的内存, 然后标记这个地方被申请
if (pfs->m_lock.free_to_dirty(dirty_state)) {
/* increase version */
size_t version = m_version.m_size_t++;
if (unlikely(version == SIZE_MAX)) {
m_version.m_size_t.store(1);
}
m_used = true;
return pfs;
}

以PFS_mutex 为例,
}