MySQL 源码解读 -- 调度器

Thread pool & scheduler

init

1
2
3
4
5
6
7
8
Event:init
--> thd = new THD(); thd-> store_globals(); //
--> event_queue = new Event_queue
--> scheduler = new Event_scheduler(event_queue)
--> event_queue->init_queue() || load_events_from_db(thd, event_queue)
--> scheduler->start(&err_no)


启动调度器 – Event_scheduler::start
event_scheduler_start

Event_scheduler::start —> 创建thd 和一个线程
event_scheduler_thread
event_scheduler_thread

Event_queue

Event_queue 核心类,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* The sorted queue with the Event_queue_element objects */
Priority_queue<Event_queue_element *,
std::vector<Event_queue_element *,
Malloc_allocator<Event_queue_element *>>,
Event_queue_less>
queue;

使用
queue(Event_queue_less(),
Malloc_allocator<Event_queue_element *>(
key_memory_Event_scheduler_scheduler_param))

Priority_queue 定义
template <typename T, typename Container = std::vector<T>,
typename Less = std::less<typename Container::value_type>>
class Priority_queue : public Less {
...
}

构造函数
Priority_queue(Less const &less = Less(),
const allocator_type &alloc = allocator_type())
: Base(less), m_container(alloc) {}

插入event
bool Event_queue::create_event(THD *thd, Event_queue_element *new_element,
bool *created)
{
new_element->compute_next_execution_time(thd);
*created = (queue.push(new_element) == false);
mysql_cond_broadcast(&COND_queue_state);
}

Event_scheduler

1
2
3
4
5
6
7
类定义
Event_scheduler {
enum enum_state state; //enum enum_state { INITIALIZED = 0, RUNNING, STOPPING };
THD *scheduler_thd;
Event_queue *queue;
ulonglong started_events;
}