pfs buffer container
很3个核心的类, 1. template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT, class U = PFS_buffer_default_array
PFS_buffer_scalable_container 数据结构 是一个类似buffer pool的算法, 其中存在n 个page, 每个page 有m 个row 一个page 就是由PFS_buffer_default_array 来管理 一个page 申请内存时, 就是调用PFS_buffer_default_allocator 来申请内存 一个row 就一个T
PFS_buffer_scalable_container
類似的拓展類, PFS_buffer_container, PFS_partitioned_buffer_scalable_container
以mutex 为例, 进行解释
typedef PFS_buffer_scalable_container<PFS_mutex, 1024, 1024> PFS_mutex_basic_container; typedef PFS_partitioned_buffer_scalable_container<PFS_mutex_basic_container, PFS_MUTEX_PARTITIONS> PFS_mutex_container; #define PFS_MUTEX_PARTITIONS 2
PFS_buffer_default_allocator
1 | template <class B, int PFS_PARTITION_COUNT> |
PFS_buffer_scalable_container 数据结构 是一个类似buffer pool的算法, 其中存在n 个page, 每个page 1024 个row, 一个row就是一个B 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT,
class U = PFS_buffer_default_array<T>,
class V = PFS_buffer_default_allocator<T>>
class PFS_buffer_scalable_container {
bool m_initialized;
bool m_full;
size_t m_max;
PFS_cacheline_atomic_size_t m_monotonic; // 当前申请row的那个page index
PFS_cacheline_atomic_size_t m_max_page_index; // 已经申请了多少page
size_t m_max_page_count; // 可以申请最大页数
size_t m_last_page_size; // 最后一页的page size
std::atomic<array_type *> m_pages[PFS_PAGE_COUNT];
allocator_type *m_allocator;
native_mutex_t m_critical_section;
/* dynamic shrink related */
/* now only PFS_thread_container allowed dynamic shrink */
bool m_dynamic_shrink;
/* each slot's page refcount */
std::atomic<size_t> m_page_refcounts[PFS_PAGE_COUNT];
/* each slot's page version snapshot */
size_t m_page_versions[PFS_PAGE_COUNT];
uint m_current_reclaim_begin;
uint m_current_reclaim_end;
public:
/* statistics of dynamic shrink */
std::atomic<size_t> page_spin_waits;
std::atomic<size_t> rc_spin_waits;
std::atomic<size_t> allocate_retry_count;
std::atomic<size_t> reclaim_fail_count;
std::atomic<size_t> reclaim_success_count;
int init(long max_size, bool dyn_shrink=false) {
// 只是把对应的设置成员变量设置好, 但内存还没有申请, m_pages[PFS_PAGE_COUNT] 指向都为空
// 调用方式类似这样, param->m_mutex_sizing 由配置参数performance_schema_max_mutex_classes进行设置
//if (global_mutex_container.init(param->m_mutex_sizing)) {
// return 1;
//}
}
value_type *allocate(pfs_dirty_state *dirty_state) {
// 第一步
// 根据m_monotonic 和m_max_page_index , 从已有的page中申请一个row 满足,
// pfs = safe_allocate_value(index, dirty_state);
// 如果可以申请,则返回, 否则 执行下一步
// 第二步
//array = load_page_no_pending(current_page_count);
// 其实就是m_pages[index].load();但如果对应的page 处在pending状态会等待
// 如果array 为空, 则array = allocate_one_array(current_page_count, true);
// 在allocate_one_array 函数中
// 如果设置了shrink, 则等待m_page_refcounts[index] 降为0
// 申请一个PFS_buffer_default_array, 并设置这个PFS_buffer_default_array
// 然后调用PFS_buffer_default_allocator>alloc_array(PFS_buffer_default_array) 申请rows 的内存
// 申请一个page size个row
//
// 第三步
调用safe_allocate_value , 如果失败,从第二步再试
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21template <class T>
class PFS_buffer_default_array {
bool m_full;
PFS_cacheline_atomic_size_t m_monotonic;
T *m_ptr;
size_t m_max;
/** Container. */
PFS_opaque_container *m_container;
/*
version number for dynamic reclaim, each time array is used,
version increase.
*/
PFS_cacheline_atomic_size_t m_version;
/*
indicating whether page is used, this is to avoid new
allocated page been freed without ever being used.
*/
std::atomic<bool> m_used{false};
/** page index */
uint m_page_index;
}
1 | template <class T> |
唯一一个额外的地方,是alloc 函数 1
2
3
4
5
6
7
8
9
10
11
12
13
14value_type *allocate(pfs_dirty_state *dirty_state) {
根据monotonic, 拿到对应位置的内存, 然后标记这个地方被申请
if (pfs->m_lock.free_to_dirty(dirty_state)) {
/* increase version */
size_t version = m_version.m_size_t++;
if (unlikely(version == SIZE_MAX)) {
m_version.m_size_t.store(1);
}
m_used = true;
return pfs;
}
以PFS_mutex 为例,
}