Для очередной статьи на Хабре нужно было реализовать собственный диспетчер, заточенный под конкретную специфическую задачу. Первоначально я этот диспетчер начал делать "в лоб", с собственными очередями, mutex-ами, condition_variable и вот этим вот всем.
Однако, уже по ходу реализации в голову пришла простая мысль о том, что все тоже самое можно получить и используя CSP-шные каналы, благо в SO-5 они уже давно есть. В общем, было сделано две реализации. Одна на нитях/mutex/condition_variable. Вторая -- на SObjectizer-овских mchain-ах. То, какая в итоге вышла разница, можно увидеть под катом.
Итак, слева первоначальный вариант на нитях/mutex/condition_variable, справа -- итоговый на mchain-ах. При этом в первоначальном варианте была допущена серьезная ошибка, которая была найдена и исправлена не сразу.
class tricky_dispatcher_t : public std::enable_shared_from_this<tricky_dispatcher_t> { friend class tricky_event_queue_t; friend class tricky_disp_binder_t; class tricky_event_queue_t : public so_5::event_queue_t { tricky_dispatcher_t & disp_; public: tricky_event_queue_t(tricky_dispatcher_t & disp) : disp_{disp} {} virtual void push(so_5::execution_demand_t demand) override { disp_.push_demand(std::move(demand)); } }; class tricky_disp_binder_t : public so_5::disp_binder_t { std::shared_ptr<tricky_dispatcher_t> disp_; public: tricky_disp_binder_t(std::shared_ptr<tricky_dispatcher_t> disp) : disp_{std::move(disp)} {} virtual so_5::disp_binding_activator_t bind_agent( so_5::environment_t &, so_5::agent_ref_t agent) override { return [d = disp_, agent] { agent->so_bind_to_dispatcher(d->event_queue_); }; } virtual void unbind_agent( so_5::environment_t &, so_5::agent_ref_t) override { // Ничего не нужно делать. } }; // Тип очереди заявок. using demand_queue_t = std::deque<so_5::execution_demand_t>; // Тип стека для ожидающих своей работы очередей. using cond_var_stack_t = std::stack<std::condition_variable *>; // Тип контейнера для рабочих очередей. using thread_pool_t = std::vector<std::thread>; // Вспомогательный тип, который потребуется для нотификации о том, // что диспетчер должен завершить свою работу. struct shutdown_t {}; // Объект, реализующий интерфейс so_5::event_queue_t для того, // чтобы выполнять привязку агентов к диспетчеру. tricky_event_queue_t event_queue_; // Замок объекта. Все модификации очередей заявок будут выполняться // под этим замком. std::mutex lock_; // Признак того, что диспетчер должен завершить свою работу. bool shutdown_{false}; // Очередь заявок для сообщений init_device и reinit_device. demand_queue_t init_reinit_queue_; // Очередь для всех остальных сообщений. demand_queue_t other_demands_queue_; // Рабочие нити первого типа, которые ждут когда для них найдется работа. cond_var_stack_t sleeping_first_type_threads_; // Рабочие нити второго типа, которые ждут когда для них найдется работа. cond_var_stack_t sleeping_second_type_threads_; // Рабочие очереди этого диспетчера. thread_pool_t work_threads_; static const std::type_index init_device_type; static const std::type_index reinit_device_type; // Вспомогательный метод для вычисления размеров подпулов. static auto calculate_pools_sizes(unsigned pool_size) { if( 2u == pool_size) // Всего две очереди в пуле. Делим пополам. return std::tuple{1u, 1u}; else { // Нитей первого типа будет 3/4 от общего количества. const auto first_pool_size = (pool_size/4u)*3u; return std::tuple{first_pool_size, pool_size - first_pool_size}; } } // Вспомогательный метод для того, чтобы взять первую спящую рабочую // нить и разбудить ее. static void wake_up_thread_from(cond_var_stack_t & vars) { auto v = vars.top(); vars.pop(); v->notify_one(); } // Вспомогательный метод для того, чтобы завершить работу всех нитей. void shutdown_work_threads() noexcept { // Сначала нужно выставить флаг shutdown для того, чтобы все // работающие нити поняли, что пришло время завершать свою работу. // Это нужно делать под захваченным замком объекта. { std::lock_guard l{lock_}; shutdown_ = true; // Если есть спящие нити, то их следует разбудить. while(!sleeping_first_type_threads_.empty()) wake_up_thread_from(sleeping_first_type_threads_); while(!sleeping_second_type_threads_.empty()) wake_up_thread_from(sleeping_second_type_threads_); } // Теперь можно дождаться момента, когда все рабочие нити закончат // свою работу. for(auto & t : work_threads_) t.join(); // Пул рабочих нитей должен быть очищен. work_threads_.clear(); } // Запуск всех рабочих нитей. // Если в процессе запуска произойдет сбой, то ранее запущенные нити // должны быть остановлены. void launch_work_threads( unsigned first_type_threads_count, unsigned second_type_threads_count) { work_threads_.reserve(first_type_threads_count + second_type_threads_count); try { for(auto i = 0u; i < first_type_threads_count; ++i) work_threads_.emplace_back([this]{ first_type_thread_body(); }); for(auto i = 0u; i < second_type_threads_count; ++i) work_threads_.emplace_back([this]{ second_type_thread_body(); }); } catch(...) { shutdown_work_threads(); throw; // Пусть с исключениями разбираются выше. } } // Тело рабочей нити первого типа. void first_type_thread_body() { // Нам потребуется знать идентфикатор текущей нити для того, // чтобы запускать обработчики событий агента с эти идентификатором. const auto thread_id = so_5::query_current_thread_id(); // Так же нам потребуется собственный condition_variable, который // будет нас будить когда поступит заявка для обработки. std::condition_variable cond_var; // Эта вспомогательная функция будет извлекать заявку из очередей или // спать до тех пор, пока что-нибудь произойдет. const auto extractor = [&]() -> std::variant<shutdown_t, so_5::execution_demand_t> { // Все операции выполняются под захваченным замком. std::unique_lock l{lock_}; while(true) { if(shutdown_) return shutdown_t{}; else if(!init_reinit_queue_.empty()) { auto d = std::move(init_reinit_queue_.front()); init_reinit_queue_.pop_front(); if(!init_reinit_queue_.empty() && !sleeping_first_type_threads_.empty()) // Нужно дать поработать еще одной нити. wake_up_thread_from(sleeping_first_type_threads_); return d; } else if(!other_demands_queue_.empty()) { auto d = std::move(other_demands_queue_.front()); other_demands_queue_.pop_front(); if(!other_demands_queue_.empty()) { // Нужно дать поработать еще одной нити. if(!sleeping_second_type_threads_.empty()) wake_up_thread_from(sleeping_second_type_threads_); else if(!sleeping_first_type_threads_.empty()) wake_up_thread_from(sleeping_first_type_threads_); } return d; } else { // Все очереди пусты, нужно заснуть до появления работы. sleeping_first_type_threads_.push(&cond_var); cond_var.wait(l); } } }; // Выполняем работу до тех пор, пока не получим сигнал shutdown. while(true) { auto r = extractor(); if(std::get_if<shutdown_t>(&r)) return; else { auto & d = std::get<so_5::execution_demand_t>(r); d.call_handler(thread_id); } } } // Тело рабочей нити второго типа. void second_type_thread_body() { // Нам потребуется знать идентфикатор текущей нити для того, // чтобы запускать обработчики событий агента с эти идентификатором. const auto thread_id = so_5::query_current_thread_id(); // Так же нам потребуется собственный condition_variable, который // будет нас будить когда поступит заявка для обработки. std::condition_variable cond_var; // Эта вспомогательная функция будет извлекать заявку из очередей или // спать до тех пор, пока что-нибудь произойдет. const auto extractor = [&]() -> std::variant<shutdown_t, so_5::execution_demand_t> { // Все операции выполняются под захваченным замком. std::unique_lock l{lock_}; while(true) { if(shutdown_) return shutdown_t{}; else if(!other_demands_queue_.empty()) { auto d = std::move(other_demands_queue_.front()); other_demands_queue_.pop_front(); if(!other_demands_queue_.empty()) { // Нужно дать поработать еще одной нити. if(!sleeping_second_type_threads_.empty()) wake_up_thread_from(sleeping_second_type_threads_); else if(!sleeping_first_type_threads_.empty()) wake_up_thread_from(sleeping_first_type_threads_); } return d; } else { // Все очереди пусты, нужно заснуть до появления работы. sleeping_second_type_threads_.push(&cond_var); cond_var.wait(l); } } }; // Выполняем работу до тех пор, пока не получим сигнал shutdown. while(true) { auto r = extractor(); if(std::get_if<shutdown_t>(&r)) return; else { auto & d = std::get<so_5::execution_demand_t>(r); d.call_handler(thread_id); } } } // Сохранение очередной заявки в очередях заявок. void push_demand(so_5::execution_demand_t demand) { // Делаем все действия обязательно под захваченным замком. std::lock_guard l{lock_}; if(init_device_type == demand.m_msg_type || reinit_device_type == demand.m_msg_type) { // Эти заявки должны идти в свою собственную очередь. const bool was_empty = init_reinit_queue_.empty(); init_reinit_queue_.emplace_back(std::move(demand)); // Если есть какая-то нить, которая ждет работу, то разбудим ее. if(was_empty && !sleeping_first_type_threads_.empty()) wake_up_thread_from(sleeping_first_type_threads_); } else { // Это заявка, которая должна попасть в общую очередь. const bool was_empty = other_demands_queue_.empty(); other_demands_queue_.emplace_back(std::move(demand)); if(was_empty) { // Возможно, какая-то из рабочих нитей ждала работу для себя. // Попробуем с этим разобраться. if(!sleeping_second_type_threads_.empty()) wake_up_thread_from(sleeping_second_type_threads_); // Рабочие потоки первого типа так же могут обработать такую // заявку, если другой работы для них нет. else if(!sleeping_first_type_threads_.empty()) wake_up_thread_from(sleeping_first_type_threads_); } } } public: // Конструктор сразу же запускает все рабочие нити. tricky_dispatcher_t( // Количество рабочих потоков, которые должны быть созаны диспетчером. unsigned pool_size) : event_queue_{*this} { const auto [first_type_count, second_type_count] = calculate_pools_sizes(pool_size); launch_work_threads(first_type_count, second_type_count); } ~tricky_dispatcher_t() noexcept { // Все работающие нити должны быть остановлены. shutdown_work_threads(); } // Метод-фабрика для создания экземпляров диспетчера. static auto make(unsigned pool_size) { return std::make_shared<tricky_dispatcher_t>(pool_size); } // Создать биндера, который сможет привязать агента к этому диспетчеру. so_5::disp_binder_unique_ptr_t binder() { return so_5::disp_binder_unique_ptr_t{ new tricky_disp_binder_t{shared_from_this()}}; } }; const std::type_index tricky_dispatcher_t::init_device_type = typeid(a_device_manager_t::init_device_t); const std::type_index tricky_dispatcher_t::reinit_device_type = typeid(so_5::mutable_msg<a_device_manager_t::reinit_device_t>); |
class tricky_dispatcher_t : public std::enable_shared_from_this<tricky_dispatcher_t> { friend class tricky_event_queue_t; friend class tricky_disp_binder_t; class tricky_event_queue_t : public so_5::event_queue_t { tricky_dispatcher_t & disp_; public: tricky_event_queue_t(tricky_dispatcher_t & disp) : disp_{disp} {} virtual void push(so_5::execution_demand_t demand) override { disp_.push_demand(std::move(demand)); } }; class tricky_disp_binder_t : public so_5::disp_binder_t { std::shared_ptr<tricky_dispatcher_t> disp_; public: tricky_disp_binder_t(std::shared_ptr<tricky_dispatcher_t> disp) : disp_{std::move(disp)} {} virtual so_5::disp_binding_activator_t bind_agent( so_5::environment_t &, so_5::agent_ref_t agent) override { return [d = disp_, agent] { agent->so_bind_to_dispatcher(d->event_queue_); }; } virtual void unbind_agent( so_5::environment_t &, so_5::agent_ref_t) override { // Ничего не нужно делать. } }; // Тип контейнера для рабочих очередей. using thread_pool_t = std::vector<std::thread>; // Объект, реализующий интерфейс so_5::event_queue_t для того, // чтобы выполнять привязку агентов к диспетчеру. tricky_event_queue_t event_queue_; // Каналы, которые будут использоваться в качестве очередей сообщений. so_5::mchain_t init_reinit_ch_; so_5::mchain_t other_demands_ch_; // Рабочие очереди этого диспетчера. thread_pool_t work_threads_; static const std::type_index init_device_type; static const std::type_index reinit_device_type; // Вспомогательный метод для вычисления размеров подпулов. static auto calculate_pools_sizes(unsigned pool_size) { if( 2u == pool_size) // Всего две очереди в пуле. Делим пополам. return std::tuple{1u, 1u}; else { // Нитей первого типа будет 3/4 от общего количества. const auto first_pool_size = (pool_size/4u)*3u; return std::tuple{first_pool_size, pool_size - first_pool_size}; } } // Вспомогательный метод для того, чтобы завершить работу всех нитей. void shutdown_work_threads() noexcept { // Сначала закроем оба канала. so_5::close_drop_content(init_reinit_ch_); so_5::close_drop_content(other_demands_ch_); // Теперь можно дождаться момента, когда все рабочие нити закончат // свою работу. for(auto & t : work_threads_) t.join(); // Пул рабочих нитей должен быть очищен. work_threads_.clear(); } // Запуск всех рабочих нитей. // Если в процессе запуска произойдет сбой, то ранее запущенные нити // должны быть остановлены. void launch_work_threads( unsigned first_type_threads_count, unsigned second_type_threads_count) { work_threads_.reserve(first_type_threads_count + second_type_threads_count); try { for(auto i = 0u; i < first_type_threads_count; ++i) work_threads_.emplace_back([this]{ first_type_thread_body(); }); for(auto i = 0u; i < second_type_threads_count; ++i) work_threads_.emplace_back([this]{ second_type_thread_body(); }); } catch(...) { shutdown_work_threads(); throw; // Пусть с исключениями разбираются выше. } } // Обработчик объектов so_5::execution_demand_t. static void exec_demand_handler(so_5::execution_demand_t d) { d.call_handler(so_5::null_current_thread_id()); } // Тело рабочей нити первого типа. void first_type_thread_body() { // Выполняем работу до тех пор, пока не будут закрыты все каналы. so_5::select(so_5::from_all(), case_(init_reinit_ch_, exec_demand_handler), case_(other_demands_ch_, exec_demand_handler)); } // Тело рабочей нити второго типа. void second_type_thread_body() { // Выполняем работу до тех пор, пока не будут закрыты все каналы. so_5::select(so_5::from_all(), case_(other_demands_ch_, exec_demand_handler)); } // Сохранение очередной заявки в очередях заявок. void push_demand(so_5::execution_demand_t demand) { if(init_device_type == demand.m_msg_type || reinit_device_type == demand.m_msg_type) { // Эти заявки должны идти в свою собственную очередь. so_5::send<so_5::execution_demand_t>(init_reinit_ch_, std::move(demand)); } else { // Это заявка, которая должна попасть в общую очередь. so_5::send<so_5::execution_demand_t>(other_demands_ch_, std::move(demand)); } } public: // Конструктор сразу же запускает все рабочие нити. tricky_dispatcher_t( // SObjectizer Environment, на котором нужно будет работать. so_5::environment_t & env, // Количество рабочих потоков, которые должны быть созаны диспетчером. unsigned pool_size) : event_queue_{*this} , init_reinit_ch_{so_5::create_mchain(env)} , other_demands_ch_{so_5::create_mchain(env)} { const auto [first_type_count, second_type_count] = calculate_pools_sizes(pool_size); launch_work_threads(first_type_count, second_type_count); } ~tricky_dispatcher_t() noexcept { // Все работающие нити должны быть остановлены. shutdown_work_threads(); } // Метод-фабрика для создания экземпляров диспетчера. static auto make(so_5::environment_t & env, unsigned pool_size) { return std::make_shared<tricky_dispatcher_t>(env, pool_size); } // Создать биндера, который сможет привязать агента к этому диспетчеру. so_5::disp_binder_unique_ptr_t binder() { return so_5::disp_binder_unique_ptr_t{ new tricky_disp_binder_t{shared_from_this()}}; } }; const std::type_index tricky_dispatcher_t::init_device_type = typeid(a_device_manager_t::init_device_t); const std::type_index tricky_dispatcher_t::reinit_device_type = typeid(so_5::mutable_msg<a_device_manager_t::reinit_device_t>); |
Комментариев нет:
Отправить комментарий