Abstract
performance schema 是一个存储引擎, 可以提供对mysql 所有指标的监控, 是一套非常详细而复杂的监控系统, 不同的指标,使用了不同的接口, 另外有几个特点:
- 它是运行时态, 因此是全内存存储, 重启后会丢失之前的数据
- 为了减少对运行时态的影响, 绝大部分资源都是提前申请好, 在performance_schema 初始化的时候,已经申请好了。涉及到2块内存, 一个是class 配置信息, 一个pfs state
- 不能增加sql 种类和语法
本文主要分 3块:
- 初始化
- 基本数据结构
- 使用过程
初始化
分为几个步骤
- 准备pfs 内部系统的内存监控的类
- 准备好pfs 配置的内存, pfs 配置主要用于设置pfs_xx_class
- 初始化pfs -- 初始化的核心操作, 最主要的核心操作是准备好PFS需要的资源,尤其是内存申请, class, pfs 监控项的container, 以mutex 为例: 申请param->m_mutex_class_sizing 个PFS_mutex_class, 存储到PFS_mutex_class的mutex_class_array中, 另外会申请监控項的container 如global_mutex_container
- 设置好所有的service,
- 把所有的pfs 的key 注册到pfs 中, 方便后续使用
pre_initialize_performance_schema
第一步, 初始化PFS_builtin_memory_class的一些类, 和一些全局状态跟踪 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34void pre_initialize_performance_schema() {
pfs_initialized = false;
init_all_builtin_memory_class();
// 初始化类似 builtin_memory_mutex/builtin_memory_rwlock/builtin_memory_mdl 等等
// 这些变量可以跟踪每种指标对应的内存消耗
// builtin_memory_mutex 类型为 PFS_builtin_memory_class
PFS_table_stat::g_reset_template.reset();
// 对PFS_table_stat 类的静态变量g_reset_template 进行重设
// PFS_table_stat 主要成员是
// PFS_table_io_stat m_index_stat[MAX_INDEXES + 1], 跟进这个index 的fetch/insert/update/delete
// PFS_table_lock_stat m_lock_stat; table 有9种锁, 每种锁的状态
global_idle_stat.reset(); // idle 的状态跟踪
global_table_io_stat.reset(); // table io 的状态跟踪,
global_table_lock_stat.reset(); // table 锁的状态跟踪
g_histogram_pico_timers.init(); // PFS_histogram_timers 的状态跟踪
global_statements_histogram.reset(); //PFS_histogram
/*
There is no automatic cleanup. Please either use:
- my_thread_end()
- or PSI_server->delete_current_thread()
in the instrumented code, to explicitly cleanup the instrumentation.
*/
THR_PFS = nullptr; // PFS_thread
for (int i = 0; i < THR_PFS_NUM_KEYS; ++i) {
THR_PFS_contexts[i] = nullptr; //PFS_table_context
}
}1
2
3
4
5
6
7
8
9/**
Initialize the dynamic array used to hold PFS_INSTRUMENT configuration
options.
*/****
void init_pfs_instrument_array() {
pfs_instr_config_array = new Pfs_instr_config_array(PSI_NOT_INSTRUMENTED);
}
typedef Prealloced_array<PFS_instr_config *, 10> Pfs_instr_config_array;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41pfs_rc = initialize_performance_schema(
&pfs_param, &psi_thread_hook, &psi_mutex_hook, &psi_rwlock_hook,
&psi_cond_hook, &psi_file_hook, &psi_socket_hook, &psi_table_hook,
&psi_mdl_hook, &psi_idle_hook, &psi_stage_hook, &psi_statement_hook,
&psi_transaction_hook, &psi_memory_hook, &psi_error_hook,
&psi_parallel_query_hook, &psi_parallel_operator_hook,
&psi_data_lock_hook, &psi_system_hook);
if ((pfs_rc != 0) && pfs_param.m_enabled) {
pfs_param.m_enabled = false;
LogErr(WARNING_LEVEL, ER_PERFSCHEMA_INIT_FAILED);
}
initialize_performance_schema () {
pfs_automated_sizing(param); //把PFS_sizing_data large_data 设置到param 中, 主要是类似p->m_events_waits_history_long_sizing
pfs_minimal_setting(param); 如果设置了performance_schema_minimal 为true, 则很多设置全部关掉
init_timers(); //初始化timer 一些偏硬件/操作系统底层函数, 方便获取一些时间
init_event_name_sizing(param); //设置 mutex_class_start/rwlock_class_start,
//在register psi(register_mutex_class)时, 得到PSI_mutex_info->m_event_name_index=mutex_class_start + index
register_global_classes(); // 注冊global 的class in pre_initialize_performance_schema
minimal_global_classes(param); // 当注册performance_schema_minimal 为true, 修改global class的一些enable和m_timed
//
init_sync_class
// 以mutex 为例: 申请param->m_mutex_class_sizing 个PFS_mutex_class,
//存储到mutex_class_array 后, 后面register 会进行设置, 在init 会查找, 申请的内存变化会更新到builtin_memory_mutex_class
init_thread_class
init_table_share // 初始化global_table_share_container, 但没有真正申请内存
//typedef PFS_buffer_scalable_container<PFS_table_share, 4 * 1024, 4 * 1024> PFS_table_share_container;
init_table_share_lock_stat
// 很多数据结构使用无锁hash 来存储
// 设置一大堆consumer的flag
flag_events_stages_current =
param->m_consumer_events_stages_current_enabled;
init_pfs_plugin_table
// PFS_dynamic_table_shares::init_mutex
//
}1
2
3
4
5
6
7
8。。。
if (psi_memory_hook != NULL) {
service = psi_memory_hook->get_interface(PSI_CURRENT_MEMORY_VERSION);
if (service != NULL) {
set_psi_memory_service(service);
}
}
。。。
如果是pfs 插件编译, 就会使用这个, 但目前是直接编译进内核, 因此不需要插件化加载 mysql_service_psi_mutex_v1_t mysql_service_psi_mutex_v1 = imp_performance_schema_psi_mutex_v1 mysql_service_psi_mutex_v1_t imp_performance_schema_psi_mutex_v1 定义在storage/perfschema/pfs.cc中
初始化所有的key
初始化所有的key, 提前把一部分 register mutext/memory psi 等结构 注册进去 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/*
Now that we have parsed the command line arguments, and have initialized
the performance schema itself, the next step is to register all the
server instruments.
*/
static void init_server_psi_keys(void) {.
...
count = static_cast<int>(array_elements(all_server_mutexes));
mysql_mutex_register(category, all_server_mutexes, count);
count = static_cast<int>(array_elements(all_server_rwlocks));
mysql_rwlock_register(category, all_server_rwlocks, count);
...
}
公共数据结构, 后续会使用到, 先列在这里 https://dev.mysql.com/doc/dev/mysql-server/8.0.20/structPFS__instr.html 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22struct PFS_instr {
/** Internal lock. */
pfs_lock m_lock;
/** Enabled flag. */
bool m_enabled;
/** Timed flag. */
bool m_timed;
/** Container page. */
PFS_opaque_container_page *m_page; // 参考PFS_partitioned_buffer_scalable_container
};
//存儲在結構pfs_instr_config_array 中
struct PFS_instr_config {
/* Instrument name. */
char *m_name;
/* Name length. */
uint m_name_length;
/** Enabled flag. */
bool m_enabled;
/** Timed flag. */
bool m_timed;
};1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73enum PFS_class_type {
PFS_CLASS_NONE = 0,
PFS_CLASS_MUTEX = 1,
PFS_CLASS_RWLOCK = 2,
PFS_CLASS_COND = 3,
PFS_CLASS_FILE = 4,
PFS_CLASS_TABLE = 5,
PFS_CLASS_STAGE = 6,
PFS_CLASS_STATEMENT = 7,
PFS_CLASS_TRANSACTION = 8,
PFS_CLASS_SOCKET = 9,
PFS_CLASS_TABLE_IO = 10,
PFS_CLASS_TABLE_LOCK = 11,
PFS_CLASS_IDLE = 12,
PFS_CLASS_MEMORY = 13,
PFS_CLASS_METADATA = 14,
PFS_CLASS_ERROR = 15,
PFS_CLASS_THREAD = 16,
/* Reserve 17-29 for official mysql */
PFS_CLASS_PARALLEL_QUERY = 30,
PFS_CLASS_LAST = PFS_CLASS_PARALLEL_QUERY,
PFS_CLASS_MAX = PFS_CLASS_LAST + 1
};
// 做一些状态统计的
struct PFS_single_stat {
/** Count of values. */
ulonglong m_count;
/** Sum of values. */
ulonglong m_sum;
/** Minimum value. */
ulonglong m_min;
/** Maximum value. */
ulonglong m_max;
}
/** Instrumentation metadata for a mutex. */
struct PFS_ALIGNED PFS_mutex_class : public PFS_instr_class {
/** Mutex usage statistics. */
PFS_mutex_stat m_mutex_stat;
/** Singleton instance. */
PFS_mutex *m_singleton;
}
/** Information for all instrumentation. */
struct PFS_instr_class {
/** Class type */
PFS_class_type m_type;
/** True if this instrument is enabled. */
bool m_enabled;
/** True if this instrument is timed. */
bool m_timed;
/** Instrument flags. */
uint m_flags;
/** Volatility index. */
int m_volatility;
/**
Instrument name index.
Self index in:
- EVENTS_WAITS_SUMMARY_*_BY_EVENT_NAME for waits
- EVENTS_STAGES_SUMMARY_*_BY_EVENT_NAME for stages
- EVENTS_STATEMENTS_SUMMARY_*_BY_EVENT_NAME for statements
- EVENTS_TRANSACTIONS_SUMMARY_*_BY_EVENT_NAME for transactions
*/
uint m_event_name_index;
/** Instrument name. */
char m_name[PFS_MAX_INFO_NAME_LENGTH];
/** Length in bytes of @c m_name. */
uint m_name_length;
/** Documentation. */
char *m_documentation;
};
以mutex 为例, 在配置文件中,打开performance_schema, 对部分的配置进行单独设置
对于performance schema 关闭的情况下, psi_mutex_service 对应的就是psi_mutex_noop 对于打开performance schema情况下, 对应的是 pfs_mutex_service_v1 每个配置项是 performance_schema_instrument = ' wait/synch/mutex = ON ' 是一行, 可以多项,表示enable 多个pfs 监控项, 如果不是精确匹配的话, 就建议增加正则匹配% 来代表所有
m_consumer_global_instrumentation_enabled 可以控制如锁之类(mutex/lock/rwlock/cond), 文件(file), table 之类, 控制范围比较广, 默认为true performance_schema_consumer_thread_instrumentation, thread 相关的都由他控制, 默认为true,
1 | performance_schema = ON |
使用接口
对mutex 的使用完全和使用pthread mutex 行为基本一致, 可以参考components/pfs_example/pfs_example.cc 1
2
3
4
5
6
7
mysql_mutex_register --> psi_mutex_service->register_mutex --> pfs_register_mutex_v1
mysql_mutex_init --> psi_mutex_service->init_mutex /my_mutex_init
mysql_mutex_destroy
mysql_mutex_lock
mysql_mutex_trylock
mysql_mutex_unlock1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28static PSI_mutex_key key_mutex_x = 0;
static PSI_mutex_key key_mutex_y = 0;
static PSI_mutex_info all_example_mutex[] = {
{&key_mutex_x, "X", PSI_FLAG_SINGLETON, PSI_VOLATILITY_PERMANENT,
"Example doc, permanent mutex, singleton."},
{&key_mutex_y, "Y", 0, PSI_VOLATILITY_QUERY,
"Example doc, very volatile mutexes."}};
static mysql_mutex_t my_mutex_x;
static mysql_mutex_t my_mutex_y;
mysql_mutex_register("pfs_example", all_example_mutex, 2);
mysql_mutex_init(key_mutex_x, &my_mutex_x, NULL);
mysql_mutex_init(key_mutex_y, &my_mutex_y, NULL);
mysql_mutex_lock(&my_mutex_x);
mysql_mutex_trylock(&my_mutex_y);
mysql_mutex_unlock(&my_mutex_y);
mysql_mutex_unlock(&my_mutex_x);
mysql_mutex_destroy(&my_mutex_x);
mysql_mutex_destroy(&my_mutex_y);
关键的数据结构
1 | struct mysql_mutex_t { |
使用流程
- 注册 使用上,需要先register psi, 类似这样
先创建PSI_mutex_key/PSI_mutex_info, 然后进行注册
参考之前的使用方式
注册函数解析 1
2
3
4
5
6
7
8
9
10pfs_register_mutex_v1 {
// 生成formatted_name <-- mutex_instrument_prefix.str + / + category + / + PSI_mutex_info_v1.m_name
key = register_mutex_class(formatted_name, (uint)full_length, info);
*(PSI_mutex_info_v1->m_key) = key
//register_mutex_class 功能
// 从mutex_class_array 中找到一个空的 PFS_mutex_class, 这个index 后面存储到*(PSI_mutex_info_v1->m_key)
// init_instr_class, 初始化这个PFS_mutex_class,
// configure_instr_class , 从pfs_instr_config_array 中找有没有和PFS_mutex_class->m_name 正则匹配的, 则设置PFS_mutex_class
}
1 | static inline int inline_mysql_mutex_init( |
- lock/unlock 的过程 当开始锁的时候, 如果enable 了time 跟踪, 会记录下申请锁 到获得锁 的时间戳, 在获得锁的时候, 会把等待时间累加进去, 并记录获得锁的时间, 如果enable 了thread, 会把时间累加到event_name_array[index]上,如果enable FLAG_EVENT, 会有一个PFS_events_waits event , 然后插入 insert_events_waits_history. unlock 的时候,就直接把指针给指向空, 这个地方没有跟踪 lock的时间。 ### 查询performance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197static inline int inline_mysql_mutex_lock(
mysql_mutex_t *that, const char *src_file MY_ATTRIBUTE((unused)),
uint src_line MY_ATTRIBUTE((unused))) {
int result;
if (that->m_psi != NULL) {
/* Instrumentation start */
PSI_mutex_locker *locker;
PSI_mutex_locker_state state;
locker = PSI_MUTEX_CALL(start_mutex_wait)(
&state, that->m_psi, PSI_MUTEX_LOCK, src_file, src_line); --> pfs_start_mutex_wait_v1
/* Instrumented code */
result = my_mutex_lock(&that->m_mutex);
/* Instrumentation end */
if (locker != NULL) {
PSI_MUTEX_CALL(end_mutex_wait)(locker, result);
}
return result;
}
/* Non instrumented code */
result = my_mutex_lock(&that->m_mutex);
return result;
}
PSI_mutex_locker *pfs_start_mutex_wait_v1(PSI_mutex_locker_state *state,
PSI_mutex *mutex,
PSI_mutex_operation op,
const char *src_file, uint src_line) {
PFS_mutex *pfs_mutex = reinterpret_cast<PFS_mutex *>(mutex);
DBUG_ASSERT((int)op >= 0);
DBUG_ASSERT((uint)op < array_elements(mutex_operation_map));
DBUG_ASSERT(state != NULL);
DBUG_ASSERT(pfs_mutex != NULL);
DBUG_ASSERT(pfs_mutex->m_class != NULL);
if (!pfs_mutex->m_enabled) {
return NULL;
}
uint flags;
ulonglong timer_start = 0;
if (flag_thread_instrumentation) {
PFS_thread *pfs_thread = my_thread_get_THR_PFS();
if (unlikely(pfs_thread == NULL)) {
return NULL;
}
if (!pfs_thread->m_enabled) {
return NULL;
}
state->m_thread = reinterpret_cast<PSI_thread *>(pfs_thread);
flags = STATE_FLAG_THREAD;
if (pfs_mutex->m_timed) {
timer_start = get_wait_timer();
state->m_timer_start = timer_start;
flags |= STATE_FLAG_TIMED;
}
if (flag_events_waits_current) {
if (unlikely(pfs_thread->m_events_waits_current >=
&pfs_thread->m_events_waits_stack[WAIT_STACK_SIZE])) {
locker_lost++;
return NULL;
}
PFS_events_waits *wait = pfs_thread->m_events_waits_current;
state->m_wait = wait;
flags |= STATE_FLAG_EVENT;
PFS_events_waits *parent_event = wait - 1;
wait->m_event_type = EVENT_TYPE_WAIT;
wait->m_nesting_event_id = parent_event->m_event_id;
wait->m_nesting_event_type = parent_event->m_event_type;
wait->m_thread_internal_id = pfs_thread->m_thread_internal_id;
wait->m_class = pfs_mutex->m_class;
wait->m_timer_start = timer_start;
wait->m_timer_end = 0;
wait->m_object_instance_addr = pfs_mutex->m_identity;
wait->m_event_id = pfs_thread->m_event_id++;
wait->m_end_event_id = 0;
wait->m_operation = mutex_operation_map[(int)op];
wait->m_source_file = src_file;
wait->m_source_line = src_line;
wait->m_wait_class = WAIT_CLASS_MUTEX;
pfs_thread->m_events_waits_current++;
}
} else {
if (pfs_mutex->m_timed) {
timer_start = get_wait_timer();
state->m_timer_start = timer_start;
flags = STATE_FLAG_TIMED;
state->m_thread = NULL;
} else {
/*
Complete shortcut.
*/
/* Aggregate to EVENTS_WAITS_SUMMARY_BY_INSTANCE (counted) */
pfs_mutex->m_mutex_stat.m_wait_stat.aggregate_counted();
return NULL;
}
}
state->m_flags = flags;
state->m_mutex = mutex;
return reinterpret_cast<PSI_mutex_locker *>(state);
}
void pfs_end_mutex_wait_v1(PSI_mutex_locker *locker, int rc) {
PSI_mutex_locker_state *state =
reinterpret_cast<PSI_mutex_locker_state *>(locker);
DBUG_ASSERT(state != NULL);
ulonglong timer_end = 0;
ulonglong wait_time = 0;
PFS_mutex *mutex = reinterpret_cast<PFS_mutex *>(state->m_mutex);
DBUG_ASSERT(mutex != NULL);
PFS_thread *thread = reinterpret_cast<PFS_thread *>(state->m_thread);
uint flags = state->m_flags;
if (flags & STATE_FLAG_TIMED) {
timer_end = get_wait_timer();
wait_time = timer_end - state->m_timer_start;
/* Aggregate to EVENTS_WAITS_SUMMARY_BY_INSTANCE (timed) */
mutex->m_mutex_stat.m_wait_stat.aggregate_value(wait_time);
} else {
/* Aggregate to EVENTS_WAITS_SUMMARY_BY_INSTANCE (counted) */
mutex->m_mutex_stat.m_wait_stat.aggregate_counted();
}
if (likely(rc == 0)) {
mutex->m_owner = thread;
mutex->m_last_locked = timer_end;
}
if (flags & STATE_FLAG_THREAD) {
PFS_single_stat *event_name_array;
event_name_array = thread->write_instr_class_waits_stats();
uint index = mutex->m_class->m_event_name_index;
DBUG_ASSERT(index <= wait_class_max);
DBUG_ASSERT(sanitize_thread(thread) != NULL);
if (flags & STATE_FLAG_TIMED) {
/* Aggregate to EVENTS_WAITS_SUMMARY_BY_THREAD_BY_EVENT_NAME (timed) */
event_name_array[index].aggregate_value(wait_time);
} else {
/* Aggregate to EVENTS_WAITS_SUMMARY_BY_THREAD_BY_EVENT_NAME (counted) */
event_name_array[index].aggregate_counted();
}
if (flags & STATE_FLAG_EVENT) {
PFS_events_waits *wait =
reinterpret_cast<PFS_events_waits *>(state->m_wait);
DBUG_ASSERT(wait != NULL);
wait->m_timer_end = timer_end;
wait->m_end_event_id = thread->m_event_id;
if (thread->m_flag_events_waits_history) {
insert_events_waits_history(thread, wait);
}
if (thread->m_flag_events_waits_history_long) {
insert_events_waits_history_long(wait);
}
thread->m_events_waits_current--;
DBUG_ASSERT(wait == thread->m_events_waits_current);
}
}
}
// unlock 逻辑比较简单, 把对于的psi 对象指针指向空
void pfs_unlock_mutex_v1(PSI_mutex *mutex) {
PFS_mutex *pfs_mutex = reinterpret_cast<PFS_mutex *>(mutex);
DBUG_ASSERT(pfs_mutex != NULL);
/*
Note that this code is still protected by the instrumented mutex,
and therefore is thread safe. See inline_mysql_mutex_unlock().
*/
/* Always update the instrumented state */
pfs_mutex->m_owner = NULL;
pfs_mutex->m_last_locked = 0;
}
``` > show tables from performance_schema; > show tables like 'events_statement%'; > show tables like 'events_wait%'; > select * from events_statements_history;