Thread pool & scheduler
init
1 2 3 4 5 6 7 8
| Event:init --> thd = new THD(); thd-> store_globals(); // --> event_queue = new Event_queue --> scheduler = new Event_scheduler(event_queue) --> event_queue->init_queue() || load_events_from_db(thd, event_queue) --> scheduler->start(&err_no)
|
启动调度器 – Event_scheduler::start
Event_scheduler::start —> 创建thd 和一个线程
event_scheduler_thread
Event_queue
Event_queue 核心类,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| /* The sorted queue with the Event_queue_element objects */ Priority_queue<Event_queue_element *, std::vector<Event_queue_element *, Malloc_allocator<Event_queue_element *>>, Event_queue_less> queue;
使用 queue(Event_queue_less(), Malloc_allocator<Event_queue_element *>( key_memory_Event_scheduler_scheduler_param))
Priority_queue 定义 template <typename T, typename Container = std::vector<T>, typename Less = std::less<typename Container::value_type>> class Priority_queue : public Less { ... }
构造函数 Priority_queue(Less const &less = Less(), const allocator_type &alloc = allocator_type()) : Base(less), m_container(alloc) {}
插入event bool Event_queue::create_event(THD *thd, Event_queue_element *new_element, bool *created) { new_element->compute_next_execution_time(thd); *created = (queue.push(new_element) == false); mysql_cond_broadcast(&COND_queue_state); }
|
Event_scheduler
1 2 3 4 5 6 7
| 类定义 Event_scheduler { enum enum_state state; //enum enum_state { INITIALIZED = 0, RUNNING, STOPPING }; THD *scheduler_thd; Event_queue *queue; ulonglong started_events; }
|