MySQL 源码解读 -- 多线程

多线程

数据类型

Mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct mysql_mutex_t {
/** The real mutex. */
my_mutex_t m_mutex;
/**
The instrumentation hook.
Note that this hook is not conditionally defined,
for binary compatibility of the @c mysql_mutex_t interface.
*/
struct PSI_mutex *m_psi{nullptr};
};


typedef pthread_mutex_t native_mutex_t;

struct my_mutex_t {
union u {
native_mutex_t m_native;
safe_mutex_t *m_safe_ptr; // 当在非linux下就使用safe_mutex_t, 同时会打开宏SAFE_MUTEX
} m_u;
};
typedef struct my_mutex_t my_mutex_t;

其中PSI_mutex 是一个抽象类, 会根据key的类型进行动态创建类型

1
2
3
4
5
6
7
8
9
10
PSI_mutex *pfs_init_mutex_v1(PSI_mutex_key key, const void *identity) {
PFS_mutex_class *klass;
PFS_mutex *pfs;
klass = find_mutex_class(key);
if (unlikely(klass == NULL)) {
return NULL;
}
pfs = create_mutex(klass, identity);
return reinterpret_cast<PSI_mutex *>(pfs);
}

如果key 为0, 是不会创建PSI_mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct PSI_mutex_info_v1 {
/**
Pointer to the key assigned to the registered mutex.
*/
PSI_mutex_key *m_key;
/**
The name of the mutex to register.
*/
const char *m_name;
/**
The flags of the mutex to register.
@sa PSI_FLAG_SINGLETON
*/
unsigned int m_flags;
/** Volatility index. */
int m_volatility;
/** Documentation. */
const char *m_documentation;
};
typedef PSI_mutex_info_v1 PSI_mutex_info;

举例来说
在thread pool

1
2
3
4
5
static PSI_mutex_key key_LOCK_thread_cache;

static PSI_mutex_info all_per_thread_mutexes[] = {
{&key_LOCK_thread_cache, "LOCK_thread_cache", PSI_FLAG_SINGLETON, 0,
PSI_DOCUMENT_ME}};

mysql_mutex_register

mysql_mutex_register(“sql”, all_per_thread_mutexes, count);
–> psi_mutex_service.register_mutex(category, info, count);
      –> pfs_register_mutex_v1(category, info, count)  {
               REGISTER_BODY_V1(PSI_mutex_key, mutex_instrument_prefix, register_mutex_class);
            }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#define REGISTER_BODY_V1(KEY_T, PREFIX, REGISTER_FUNC)                         \
KEY_T key; \
char formatted_name[PFS_MAX_INFO_NAME_LENGTH]; \
size_t prefix_length; \
size_t len; \
size_t full_length; \
\
DBUG_ASSERT(category != NULL); \
DBUG_ASSERT(info != NULL); \
if (unlikely( \
build_prefix(&PREFIX, category, formatted_name, &prefix_length)) || \
//formatted_name.append(PREFIX->str).append("/").append(category).append("/")
!pfs_initialized) { \
for (; count > 0; count--, info++) *(info->m_key) = 0; \
return; \
} \
\
for (; count > 0; count--, info++) { \
DBUG_ASSERT(info->m_key != NULL); \
DBUG_ASSERT(info->m_name != NULL); \
len = strlen(info->m_name); \
full_length = prefix_length + len; \
if (likely(full_length <= PFS_MAX_INFO_NAME_LENGTH)) { \
memcpy(formatted_name + prefix_length, info->m_name, len); \
key = REGISTER_FUNC(formatted_name, (uint)full_length, info); \
} else { \
pfs_print_error("REGISTER_BODY_V1: name too long <%s> <%s>\n", category, \
info->m_name); \
key = 0; \
} \
\
*(info->m_key) = 2; \
} \
return

1
2
3
4
5
6
7
8
9
10
11
typedef struct MYSQL_LEX_STRING LEX_STRING;
typedef struct MYSQL_LEX_CSTRING LEX_CSTRING;
struct MYSQL_LEX_STRING {
char *str;
size_t length;
};

struct MYSQL_LEX_CSTRING {
const char *str;
size_t length;
};

cond

1
2
3
4
5
6
7
8
9
10
11
12
typedef pthread_cond_t native_cond_t;

struct mysql_cond_t {
/** The real condition */
native_cond_t m_cond;
/**
The instrumentation hook.
Note that this hook is not conditionally defined,
for binary compatibility of the @c mysql_cond_t interface.
*/
struct PSI_cond *m_psi;
};
1
2
3
4
5
6
7
8
9
static PSI_cond_key key_COND_thread_cache;
static PSI_cond_key key_COND_flush_thread_cache;

static PSI_cond_info all_per_thread_conds[] = {
{&key_COND_thread_cache, "COND_thread_cache", PSI_FLAG_SINGLETON, 0,
PSI_DOCUMENT_ME},
{&key_COND_flush_thread_cache, "COND_flush_thread_cache",
PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}};

Thread pool

Connection_handler_manager

1
2
3
4
5
6
switch (Connection_handler_manager::thread_handling) {
case SCHEDULER_ONE_THREAD_PER_CONNECTION:
connection_handler = new (std::nothrow) Per_thread_connection_handler();
break;
case SCHEDULER_NO_THREADS:
connection_handler = new (std::nothrow) One_thread_connection_handler();