воскресенье, 2 апреля 2017 г.

[prog.c++] Аннотированный код нового примера для SObjectizer-5.5.19

Вот уже пару месяцев идет активная работа над новой версией SObjectizer -- 5.5.19. Одна из основных фич этой версии уже вполне себе заработала. Речь про возможность запустить SObjectizer Environment в однопоточном режиме. Кратко поясню, в чем суть.

Обычный SObjectizer Environment для выполнения своей работы запускает три дополнительные рабочие нити. Первая принадлежит диспетчеру по умолчанию (к этому диспетчеру агенты привязываются, если пользователь явно не указал, на каком диспетчере агент должен работать). Вторая обслуживает таймер (она так и называется -- таймерная нить). Третья нить нужна для того, чтобы корректно завершать дерегистрацию коопераций и давать возможность диспетчерам завершать рабочие нити, созданные под обслуживание этих коопераций.

Эти три дополнительные нити не являются сколько-нибудь существенными накладными расходами в большом многопоточном приложении, в котором создаются десятки, а то и сотни рабочих потоков. Но бываю случаи, когда SObjectizer используется в небольших утилитах, в которых практически всю работу можно делать на контексте одной-единственной рабочей нити. В таких случаях создание сразу трех(!) дополнительных нитей -- это из пушки по воробьям.

Как раз для поддержки подобных случаев в SO-5.5.19 добавляется возможность создать такой SObjectizer Environment, который будет все свои действия выполнять на одной-единственной нити. Т.е. диспетчер по умолчанию, таймер и окончательная дерегистрация будут работать на той самой нити, на которой был произведен вызов so_5::launch().

Фича эта уже работает, но мы пока еще не проверили ее во всех сценариях, для которых она предназначалась. Поэтому пока не можем считать ее готовой к релизу. Но в процессе ее реализации в список примеров SObjectizer-а был добавлен еще один пример. Мне кажется, он достаточно интересен и демонстрирует использование многопоточности вообще без агентов. Только голые нити и CSP-шные каналы (называющиеся у нас mchain-ы). Ну и очередная разновидность ping-pong-а, только на этот раз с использованием таймеров. Пример, снабженный комментариями, под катом. Кому интересно, милости прошу посмотреть и высказать свое фи авторитетное мнение.

Напоследок скажу, что в скоуп версии 5.5.19 входит еще одна важная фича -- это moveable-сообщения, о которых речь заходила с месяц назад. Работы здесь еще очень много. Так что пока даже не рискну предположить предполагаемую дату релиза. Хорошо бы успеть к концу апреля, но загадывать не берусь. Жаль, что работы над 5.5.19 так затянулись, но однопоточный SObjectizer Environment -- это настолько переворачивающая все с ног на голову переделка SO-5, что торопиться ну очень не хочется.

Итак, сам код:

/*
 * Простой пример, демонстрирующий обмен отложенными сигналами между
 * двумя рабочими нитями посредством mchain-ов.
 *
 * SObjectizer Environment запускается посредством wrapped_env, а в качестве
 * инфраструктуры Environment-а задается simple_mtsafe-инфраструктура
 * (т.е. Environment выполняет все свои действия, включая обслуживание
 * таймеров на контексте одной-единственной нити).
 */

#include <so_5/all.hpp>

// Сообщения для обмена между нитями.
struct tick final : public so_5::signal_t {};
struct tack final : public so_5::signal_t {};

// Специальный сигнал для того, чтобы завершить обмен и закончить работу.
struct stop final : public so_5::signal_t {};

// Вспомогательный класс для уменьшения объема копи-пасты при реализации
// взаимодействующих нитей.
//
// Каждая из взаимодействующих нитей должна хранить два атрибута:
// - размер паузы для отсылки ответного сигнала. Эта пауза должна уменьшаться
//   при получении каждого следующего сообщения;
// - признак необходимости завершения работы. Этот флаг взводится либо при
//   уменьшении паузы ниже определенного порога (5ms). Либо же после получения
//   сигнала stop.
//
// При получении очередного сигнала (tick или tack) нужно попробовать уменьшить
// размер паузы и, если ее размер еще не опустился ниже заданного порога,
// следует отослать ответный сигнал (в ответ на tick нужно отослать tack и
// наоборот). Если же пауза стала меньше допустимого порога, то следует
// отослать сигнал stop и завершить свою работу. Для того, чтобы не повторять
// всю это последовательность при обработке каждого из сигналов, сделан один
// шаблонный метод reply_or_stop, который параметризуется типом ответа (т.е.
// при обработке tick нужно будет вызвать reply_or_stop<tack>).
//
class thread_state final
{
   std::chrono::milliseconds pause_{ 750 };
   bool must_stop_{ false };

public :
   bool must_stop() const { return must_stop_; }

   template<typename Reply>
   void reply_or_stop(
      // to -- это mchain, в который следует отсылать ответ, либо же
      // сигнал stop, если пришло время завершить работу.
      const so_5::mchain_t & to )
   {
      if( pause_ > std::chrono::milliseconds(5) )
      {
         pause_ = std::chrono::milliseconds(
               static_castdecltype(pause_.count()) >(pause_.count() / 1.5) );
         so_5::send_delayed< Reply >( to, pause_ );
      }
      else
      {
         so_5::send< stop >( to );
         must_stop_ = true;
      }
   }
};

// Тело рабочей нити для обмена сообщениями.
void thread_body(
   // Откуда читать входящие сообщения.
   so_5::mchain_t recv_chain,
   // Куда писать исходящие сообщения.
   so_5::mchain_t write_chain )
{
   thread_state state;
   // Основной цикл работы нити выполняется внутри одного вызова receive.
   // Внутри receive читаются сообщения из потока recv_chain.
   // Выход из receive произойдет либо при закрытии recv_chain, либо
   // если предикат state.must_stop() возвратит true.
   receive(
      // Здесь задается откуда читать входящие и при каких условиях
      // чтение нужно завершить (контролем условий выхода должна заниматься
      // лямбда-функция, переданная в stop_on()).
      from(recv_chain).stop_on( [&state]{ return state.must_stop(); } ),
      // Реакция на входящее сообщение tick.
      [&]( so_5::mhood_t<tick> ) {
         std::cout << "Tick!" << std::endl;
         state.reply_or_stop< tack >( write_chain );
      },
      // Реакция на входящее сообщение tack.
      [&]( so_5::mhood_t<tack> ) {
         std::cout << "Tack!" << std::endl;
         state.reply_or_stop< tick >( write_chain );
      } );
}

// Маленькая вспомогательная функция для подготовки объекта типа
// environment_params_t с нужными значениями внутри. В принципе, без этой
// функции вполне можно было бы обойтить и сделать все настройки внутри функции
// main(). Но т.к. приходится иметь дело с компиляторами, которые не полностью
// поддерживают C++11, то такая настройка пока еще выглядит довольно
// многословно. Поэтому ее проще вынести в отдельную функцию дабы уменьшить
// количество строк в main().
so_5::environment_params_t make_env_params()
{
   so_5::environment_params_t env_params;
   // Вот так указывается, что вместо стандартной многопоточной
   // SObjectizer-овской инфраструктуры следует использовать однопоточную
   // инфраструктуру, но обеспечивающую thread-safety.
   env_params.infrastructure_factory(
         so_5::env_infrastructures::simple_mtsafe::factory() );
   return env_params;
}

int main()
{
   // Создаем SObjectizer Environment и сразу запускаем его.
   // Стартует обработка таймеров, появляется возможность отсылать
   // отложенные сообщения.
   so_5::wrapped_env_t sobj( make_env_params() );

   // Объект для второй нити, которая будет участвовать в обмене сообщениями.
   // Сама нить будет запущена позже. Этот же объект нужен для того, чтобы
   // задействовать автоматический вызов join() для второй нити при выходе
   // из скоупа.
   std::thread second_thread;
   // Функция auto_join возвращает объект, в деструкторе которого автоматически
   // вызовется join() для объекта second_thread. Благодаря этому нам не нужно
   // беспокоиться о вызове join() даже если возврат произойдет из-за
   // исключения.
   const auto thread_joiner = so_5::auto_join( second_thread );

   // Создаем два канала, которые будут использоваться для обмена сообщениями.
   auto ch1 = create_mchain( sobj );
   auto ch2 = create_mchain( sobj );
   // Длаем так, чтобы каналы автоматически закрылись при выходе из скоупа.
   // Если каналы не зарывать явным образом, то при преждевременном выходе
   // из main() может оказаться, что вторая нить будет висеть внутри receive().
   const auto ch_closer = so_5::auto_close_drop_content( ch1, ch2 );

   // Вот теперь можно запустить вторую нить. Если после этой точке в main()
   // возникнет исключение или просто случится преждевременный выход из main()
   // из-за использования return-а, то сперва будут закрыты каналы ch1 и ch2,
   // что приведет к выходу из receive внутри рабочих нитей.
   second_thread = std::thread( thread_body, ch2, ch1 );

   // Для того, чтобы обмен между нитями пошел, требуется самое первое
   // сообщение, с которого все и начнется. Отправляем его сразу же в первый
   // канал.
   so_5::send< tick >( ch1 );

   // Вторая нить уже запущена. А в качестве первой нити будет главная нить
   // приложения. Возврат из thread_body возможен либо после нормального
   // завершения (уменьшение значение pause ниже установленного порога с
   // последующим сигналом stop), либо в результате исключения.
   thread_body( ch1, ch2 );

   // Просто завершаем работу приложения. Созданные каналы будут автоматически
   // закрыты посредством вспомогательного объекта ch_closer. А для второй
   // нити автоматически будет вызван join() посредством вспомогательного
   // объекта thread_joiner.
   return 0;
}
Отправить комментарий