понедельник, 26 января 2015 г.

[prog.c++.sobjectizer] Разбор примера простой реализации дедлайнов для сообщений

В продолжение темы, начатой заметками "Реализация дедлайнов для сообщений своими руками" и "Агент-collector + агент-performer = паттерн для нескольких ситуаций", в SObjectizer добавлен еще один пример simple_message_deadline. Оказалось, что в данном примере задействуется много ключевых возможностей SObjectizer-а, поэтому имеет смысл разобрать пример подробнее, объясняя, по ходу рассказа, что к чему и почему. Рассказ я постарался сделать максимально простым и подробным, ориентированным на читателя, который либо вообще не имеет представления о SObjectizer-е, либо имеет самое поверхностное.

Что делает пример? Один агент-generator инициирует запросы, у каждого из которых есть дедлайн. Т.е. для запроса задано время, до наступления которого обработка запроса должна быть начата. Если этого не случилось, то запрос должен быть отвергнут.

Агент-collector, который собирает запросы от агента-generator-а в очередь и по одному отдает их агенту-performer-у. Агент-performer имитирует обработку запросов, затрачивая по четыре секунды на каждый полученный им запрос. За то время, пока агент-performer "занимается" обработкой, у части сохраненных агентом-collector-ом запросов может наступить дедлайн. Это контролируется агентом-collector-ом, который выбрасывает просроченные запросы из очереди.

Вот такая незамысловатая логика работы. Теперь можно перейти к разбору того, как это реализовано в коде.

Сначала нужно описать прикладные сообщения, которые будут ходить между агентами. В SObjectizer каждое сообщение должно иметь свой собственный уникальный тип, т.к. диспетчеризация и обработка сообщений происходит по их типу.

Первое сообщение -- это запрос, который идет от generator-а к collector-у, а затем и к performer-у:

// Запрос, который будет отсылаться генератором.
struct msg_request : public so_5::rt::message_t
{
   std::string m_id;
   std::time_t m_deadline;
   const so_5::rt::mbox_t m_reply_to;

   msg_request(
      std::string id,
      std::time_t deadline,
      const so_5::rt::mbox_t & reply_to )
      :  m_id( std::move( id ) )
      ,  m_deadline( deadline )
      ,  m_reply_to( reply_to )
   {}
};

Сообщение содержит минимальное количество полей, необходимых для работы примера. Некий идентификатор, который поможет сопоставлять запросы и ответы. Дедлайн, до наступления которого запрос должен поступить на обработку. И обратный адрес, на который должен быть отослан ответ на запрос (в SObjectizer сообщения отсылаются в "почтовые ящики", называемые mbox-ами, а ссылки на эти почтовые ящики хранятся в виде объектов mbox_t).

Далее идет определение удобного псевдонима, который позволит в некоторых местах писать более лаконично:

// Удобный псевдоним умного указателя на экземпляр запроса.
using msg_request_smart_ptr_t = so_5::intrusive_ptr_t< msg_request >;

Нужен этот псевдоним вот для чего. Сообщения в SObjectizer отсылаются в виде динамически созданных объектов. Время жизни экземпляра сообщения контролируется SObjectizer-ом посредством использования интрузивного умного указателя intrusive_ptr_t. Экземпляр сообщения автоматически уничтожается после того, как исчезнут все ссылки на него. Т.е. когда сообщение будет обработано всеми подписчиками.

В данном примере агент-collector получив сообщение msg_request от generator-а должен сохранить экземпляр сообщения у себя на какое-то время. Сделать это можно либо просто скопировав сообщение (т.е. создав новый экземпляр), либо же сохранив у себя ссылку на уже имеющийся экземпляр посредством умного указателя. Второй подход более эффективен, т.к. не требует повторного выделения памяти под msg_request. Именно поэтому он используется агентом-collector-ом. А чтобы не писать intrusive_ptr_t<msg_request> вводится более удобное имя msg_request_smart_ptr_t.

Далее необходимы ответные сообщения, которые будут сообщать generator-у результат обработки запроса. Таких сообщений два. Одно используется для информирования об успешной обработке запроса. Второе -- о неудачной.

// Результат успешной обработки запроса.
struct msg_positive_reply : public so_5::rt::message_t
{
   std::string m_id;
   std::string m_result;
   std::time_t m_started_at;

   msg_positive_reply(
      std::string id,
      std::string result,
      std::time_t started_at )
      :  m_id( std::move( id ) )
      ,  m_result( std::move( result ) )
      ,  m_started_at( started_at )
   {}
};

// Уведомление о том, что запрос обработан не был.
struct msg_negative_reply : public so_5::rt::message_t
{
   std::string m_id;
   std::time_t m_deadline;

   msg_negative_reply(
      std::string id,
      std::time_t deadline )
      :  m_id( std::move( id ) )
      ,  m_deadline( deadline )
   {}
};

В сообшении msg_positive_reply поле m_started_at указывает время, когда сообщение поступило на обработку. В сообщении msg_negative_reply поле m_deadline -- это копия значения поля m_deadline из msg_request. Оно нужно лишь для того, чтобы проще было убедиться в том, что дедлайн для сообщения действительно наступил.

То, что все показанные выше сообщения являются наследниками so_5::rt::message_t пока оставим без объяснений, а чуть ниже разберем этот вопрос чуть подробнее, когда появится использование еще одного базового типа so_5::rt::signal_t.

После объявления всех необходимых прикладных сообщений можно определять агента-generator-а. В данном примере у него тривиальная логика поведения: при старте он отсылает агенту-collector-у серию сообщений msg_request с разными дедлайнами, а затем ждет ответов на свои вопросы. Когда все ответы получены, работа примера завершается.

Для отсылки запросов collector-у нужно знать mbox collector-а. Этот mbox передается генератору в конструктор и сохраняется в виде атрибута агента-generator-а:

class a_generator_t : public so_5::rt::agent_t
{
public :
   a_generator_t(
      so_5::rt::environment_t & env,
      const so_5::rt::mbox_t & processor_mbox )
      :  so_5::rt::agent_t( env )
      ,  m_processor_mbox( processor_mbox )
   {}

Название processor_mbox, а не, скажем, collector_mbox, было выбрано потому, что агент-generator не знает, что за обработку его запросов будет отвечать пара агентов (collector+performer). Для генератора есть только mbox, на который нужно отослать запрос. Скрывается ли за этим mbox-ом один обработчик, два, три или больше -- уже неважно. Поэтому агент-generator ситает, что есть некий processor для его запросов, mbox которого и передается агенту-generator-у в конструкторе.

Для того, чтобы получать ответные сообщения агент-generator должен подписаться на них. У агентов в SObjectizer-е есть специальный метод so_define_agent(), который предназначен для выполнения подписки на интересующие агента сообщения. Агент-generator подписывает два своих события (можно сказать, что событие -- это метод-обработчик, который вызывается у агента для реакции на сообщение):

   virtual void
   so_define_agent() override
   {
      so_default_state()
            .event( &a_generator_t::evt_positive_reply )
            .event( &a_generator_t::evt_negative_reply );
   }

Агент-generator обладает очень простой логикой, поэтому ему не нужно менять свое состояние и оба своих события агент подписывает для состояния по-умолчанию, которое есть у всех SObjectizer-овских агентов. Метод so_default_state() возвращает ссылку на соответствующий объект-состояние, а методы event() этого объекта выполняют подписку событий, реализованных методами evt_positive_reply() и evt_negative_reply().

В SObjectizer существует несколько вариантов метода event() для подписки событий агента. В данном случае используется самый минималистичный и простой вариант -- события агента подписываются на сообщения, которые адресованы персонально этому агенту-generator-у. Тип сообщения, которое должно приводить к возникновению события, определяется автоматически на основании сигнатуры метода-события. В данном случае сигнатуры следующие:

void evt_positive_reply( const msg_positive_reply & evt );
void evt_negative_reply( const msg_negative_reply & evt );

Что позволяет SObjectizer-у самостоятельно определить, что событие evt_positive_reply должно возникать когда агенту присылают сообщение msg_positive_reply, а событие evt_negative_reply -- когда присылают сообщение msg_negative_reply.

Для упрощения примера все заявки агент-generator рассылает в начале своей работы. У агентов в SObjectizer-е есть метод so_evt_start(), который SObjectizer автоматически вызывает при запуске агента. Агент-generator использует этот метод для того, чтобы отослать серию запросов на processor_mbox:

virtual void
so_evt_start() override
{
   // Информация для вычисления дедлайнов отсылаемых сообщений.
   unsigned int delays[] = { 145391512 };

   const std::time_t now = std::time(nullptr);

   int i = 0;
   forauto d : delays )
   {
      // Создаем идентификатор сообщения.
      std::ostringstream idstream;
      idstream << "i=" << i << ";d=" << d;
      const std::string id = idstream.str();

      // Вычисляем дедлайн сообщения.
      const std::time_t deadline = now + d;

      // Отсылаем запрос на обработку.
      so_5::send< msg_request >( m_processor_mbox,
            id,
            deadline,
            so_direct_mbox() );

      std::cout << "sent: [" << id << "], deadline: "
            << time_to_string( deadline ) << std::endl;

      // Количество отосланных запросов увеличилось,
      // должно увеличится количество ожидаемых ответов.
      ++m_expected_replies;
      ++i;
   }
}

Внутри метода so_evt_start() к SObjectizer-у имеет отношение только шаблонная функция so_5::send. Она используется для конструирования и отсылки сообщения типа msg_request. Первым параметром для send-а является mbox, на который сообщение должно быть отослано. Остальные параметры передаются в конструктор нового экземпляра msg_request. Этот экземпляр будет сконструирован внутри send, пользователю не нужно делать этого вручную.

Последним параметром конструктора msg_request (и, соответственно, последним аргументом при вызове send) является mbox, на который должен придти ответ. В качестве этого mbox-а указывается персональный mbox агента-generator-а. Каждый агент имеет свой личный mbox, доступ к этому mbox-у осуществляется посредством метода so_direct_mbox(). Название direct_mbox используется потому, что в SObjectizer есть два типа mbox-ов. Самый простой из них реализует модель multi-producer/single-consumer, т.е. сообщение в mbox может отослать кто угодно, а вот получить сообщение из mbox-а может только единственный агент-владелец. Таким образом mbox подобного типа является прямым каналом для связи с агентом, отсюда и название direct_mbox. Есть так же и тип multi-producer/multi-consumer mbox-а, но в этом примере он не используется.

Итак, агент-generator в so_evt_start() отсылает серию запросов после чего SObjectizer будет вызывать события агента-generator-а при поступлении ответных сообщений. В обработчиках ответов агент-generator не делает ничего особенного, просто печатает на консоль то, что к нему поступает. Внимание можно обратить разве что на подсчет количества полученных ответов и на действие, которое агент предпринимает, когда все ответы получены:

   void
   count_reply()
   {
      --m_expected_replies;
      if( !m_expected_replies )
         so_deregister_agent_coop_normally();
   }

Когда все ответы до агента дошли, агент инициирует дерегистрацию кооперации, в которую он входит. Кооперация -- это совокупность агентов, которые совместно выполняют какую-то работу. В данном случае в кооперацию входит три агента: generator, collector и performer. Для данного примера нужны все три агента, взаимодействующие друг с другом, каждый по отдельности не имеет особого смысла. Следовательно, в начале работы все три агента должны одновременно появиться, а когда действия примера завершаются, все три агента должны исчезнуть.

Именно поэтому агенты объеденены в кооперацию: они вместе появляются внутри SObjectizer-а, вместе и покидают SObjectizer. Вызов метода so_deregister_agent_coop_normally() как раз дает команду изъять кооперацию и удалить входящих в нее агентов. При дерегистрации кооперации нужно указать причину -- выполняется ли дерегистрация в соответствии с нормальной логикой работы или же из-за ошибки. В данном случае дерегистрация -- это нормальное действие, поэтому используется метод so_deregister_agent_coop_normally().

В данном примере регистрируется всего одна кооперация. Когда она дерегистрируется, SObjectizer понимает, что работающих агентов больше не осталось, и завершает свою работу.

Самый сложный агент в данном примере -- это агент-collector. Перед тем, как рассматривать его код, имеет смысл объяснить логику его работы.

Агент-collector получает запросы от generator-а и переадресует их агенту-performer-у. Т.к. запросы могут приходить быстрее, чем performer будет их обрабатывать, агенту-collector-у приходится сохранять запросы у себя. Для чего используется приоритетная очередь. Запросы с наиболее близким дедлайном получают наивысший приоритет и оказываются в начале очереди.

Запросы пересылаются агенту-performer-у. Когда performer завершает обработку очередного запроса, он информирует агента-collector-а об этом посредством уведомления msg_select_next_job. Когда агент-collector получает msg_select_next_job, он берет первую заявку из очереди и пересылает ее агенту-performer-у.

У каждого сообщения есть дедлайн и агент-collector должен контролировать дедлайны. Для этого при сохранении сообщения в очереди агент-collector взводит таймер. При срабатывании таймера collector получит сигнал и сможет выбросить из очереди сообщение, дедлайн которого натупил (если оно еще есть в очереди).

При получении msg_request агент-collector может оказаться в двух ситуациях:

  • агент-performer свободен и новый msg_request может быть сразу отослан на обработку;
  • агент-performer занят обработкой ранее переданного ему запроса и новый msg_request должен быть сохранен в очереди.

В коде агента-collector-а эти две ситуации выражаются явно путем объявления двух состояний для агента:

   const so_5::rt::state_t st_performer_is_free = so_make_state();
   const so_5::rt::state_t st_performer_is_busy = so_make_state();

Состояние определяет, какие сообщения посредством каких событий будут обрабатываться, когда агент находится в этом состоянии. Это показывается в методе so_define_agent() агента-collector-а при подписке агента на сообщения:

   virtual void
   so_define_agent() override
   {
      this >>= st_performer_is_free;

      st_performer_is_free
         .event( &a_collector_t::evt_first_request );

      st_performer_is_busy
         .event( &a_collector_t::evt_yet_another_request )
         .event< msg_select_next_job >( &a_collector_t::evt_select_next_job )
         .event< msg_check_deadline >( &a_collector_t::evt_check_deadline );
   }

Здесь видно, что в состоянии st_perfomer_is_free агент реагирует только на одно событие -- получение нового msg_request. Обработка этого события выполняется методом evt_first_request.

А вот в состоянии st_performer_is_busy агенту нужно обрабатывать больше событий. Событие evt_yet_another_request отвечает за обработку очередного msg_request. Событие evt_select_next_job -- за выдачу первого msg_request из внутренней очереди освободившемуся агенту-performer-у. Событие evt_check_deadline -- за проверку наступления дедлайнов для хранящихся во внутренней очереди запросов.

Почему в состоянии st_performer_is_busy выполняется больше действий, чем в состоянии st_performer_is_free? Потому, что когда агент-performer свободен, очередь заявок в collector-е пуста из-за чего нет надобности реагировать на просьбу выдать следующий запрос (очередь пуста, выдавать нечего). Как и нет надобности проверять дедлайны -- очередь пуста, в ней нет заявок. Поэтому msg_select_next_job и msg_check_deadline в состоянии st_performer_is_free просто игнорируются.

Первой же операцией в so_define_agent() является перевод агента-collector-а в состояние st_performer_is_free. Это обязательно нужно сделать, т.к. изначально агент находится в состоянии по-умолчанию. И, если агента не перевести из него в состояние st_performer_is_free, то агент-collector не будет работать, т.к. подписки сделаны для st_performer_is_free и st_performer_is_busy, а не для состояния по-умолчанию.

Глядя на код so_define_agent() можно увидеть, что в нем используются две формы методов event() для подписки событий. В одном случае event() получает только указатель на метод-обработчик события. Этот случай уже рассматривался выше для агента-generator-а. Во втором случае параметром шаблона задается еще и имя типа сообщения. Этот вариант event() нужен для случая, когда по сигнатуре метода-обработчика нельзя определить, какое сообщение приводит к возникновению события. Действительно, методы evt_select_next_job() и evt_check_deadline() не имеют аргументов вообще:

void evt_select_next_job();
void evt_check_deadline();

Все дело в том, что в SObjectizer-е различаются два типа сообщений. Первый тип -- это собственно полноценные сообщения, которые переносят в себе информацию. Примерами являются сообщения msg_request, msg_positive_reply и msg_negative_reply. Методы-обработчики таких сообщений должны получать экземпляр сообщения в качестве параметра, иначе смысла в обработчике нет. Поэтому при подписке обработчика полноценного сообщения в методе event() нет необходимости указывать тип сообщения явным образом -- он выводится автоматически.

Второй тип -- это сигналы. Т.е. это сообщения, которые не переносят никакой дополнительной информации кроме факта собственного существования. Пример -- это уведомление msg_select_next_job от агента-performer-а. Важен сам факт того, что агент закончил с текущей заявкой и готов перейти к следующей. Поэтому больше никакой информации с msg_select_next_job не связано.

Внутри SObjectizer-а механизмы доставки сообщений и сигналов несколько отличаются друг от друга. Сообщение обязательно должно быть представлено каким-то объектом (экземпляром), ссылка на который передается в событие. А вот для сигнала никаких экземпляров нет, есть только информация о сигнале конкретного типа. Поэтому в метод-обработчик передавать нечего. Отсюда и формат обработчиков сигналов -- метод без параметров. А раз метод-обработчик не имеет параметров, то вывести тип сигнала-инцидента нет возможности и пользователю нужно указывать тип сигнала явно:

st_performer_is_busy
   .event( &a_collector_t::evt_yet_another_request )
   .event< msg_select_next_job >( &a_collector_t::evt_select_next_job )
   .event< msg_check_deadline >( &a_collector_t::evt_check_deadline );

Различия между сообщениями и сигналами еще и в том, что тип для сообщения с данными должен быть наследником so_5::rt::message_t. А тип для сигнала должен наследоваться от so_5::rt::signal_t. Что можно видеть в коде агента-collector-а:

struct msg_select_next_job : public so_5::rt::signal_t {};
struct msg_check_deadline : public so_5::rt::signal_t {};

Такое требование с наследованием либо от message_t, либо от signal_t, позволяет SObjectizer-у контролировать корректность действий пользователя. Так, если тип сообщения отнаследован от message_t, то SObjectizer будет гарантировать, что в обработчик сообщения будет передана корректная ссылка и что пользователь не сможет отослать в качестве сообщения нулевой указатель.

Если посмотреть на сигнатуры методов-обработчиков в агенте-generator-е и агенте-collector-е, то можно заметить, что они существенно отличаются. В первом случае обработчик получает ссылку на экземпляр сообщения:

void evt_positive_reply( const msg_positive_reply & evt );
void evt_negative_reply( const msg_negative_reply & evt );

Тогда как во втором случае -- ссылку на вспомогательный SObjectizer-овский объект event_data_t:

void evt_first_request(
    const so_5::rt::event_data_t< msg_request > & evt )
void evt_yet_another_request(
    const so_5::rt::event_data_t< msg_request > & evt )

Первая форма является наиболее общеупотребительной и удобной -- обработчик события просто получает ссылку на экземпляр сообщения-инцидента. Вторая форма используется, в основном, в случаях, когда нужно иметь возможность сохранить указатель на экземпляр сообщения-инцидента. Именно это и нужно агенту-collector-у. Для того, чтобы не пересоздавать сообщение msg_request заново, агент-collector использует тот же самый экземпляр сообщения (при перепосылке его агенту-performer-у и при сохранении msg_request-а в своей очереди). Для получения умного указателя на экземпляр сообщения-инцидента используется метод make_reference() класса event_data_t.

Реакция на msg_request в состоянии st_performer_is_free (т.е. когда агент-perfomer не занят обработкой запросов) тривиальна -- нужно просто сменить состояние на st_performer_is_busy и переслать этот же экземпляр сообщения агенту-performer-у:

void
evt_first_request( const so_5::rt::event_data_t< msg_request > & evt )
{
   this >>= st_performer_is_busy;

   m_performer_mbox->deliver_message( evt.make_reference() );
}

Здесь для перепосылки того же самого экземпляра сообщения используется метод deliver_message(), а не функция send(). В текущей версии SObjectizer-а шаблонные функции send() не поддерживают операцию перепосылки, поэтому задействуется метод mbox-а. Вообще-то шаблонные функции send() -- это всего лишь простые вспомогательные обертки, которые конструируют новый экземпляр сообщения, а затем передают его в mbox-овский deliver_message(). Изначально функций send() вообще не было, но со временем они были добавлены для упрощения жизни разработчикам, в особенности при написании кода внутри C++ных шаблонов.

Реакция на msg_request в состоянии st_performer_is_busy поинтереснее:

void
evt_yet_another_request(
   const so_5::rt::event_data_t< msg_request > & evt )
{
   // Агент-performer занят, поэтому запрос должен быть
   // сохранен в очереди, а дедлайн запроса нужно проконтролировать.

   const std::time_t now = std::time(nullptr);
   if( now < evt->m_deadline )
   {
      m_pending_requests.push( evt.make_reference() );

      // Для простоты на каждый сохраненный в очереди запрос
      // генерируем таймерный сигнал.
      so_5::send_delayed_to_agent< msg_check_deadline >(
            *this,
            std::chrono::seconds( evt->m_deadline - now ) );
   }
   else
   {
      // Дедлайн для сообщения наступил пока сообщение шло к нам.
      // Поэтому вместо обработки сразу же отсылается
      // отрицательный ответ.
      send_negative_reply( *evt );
   }
}

Новый запрос сохраняется в очереди. И сразу же агент-collector отсылает себе отложенное сообщение-сигнал для контроля дедлайна сообщения. На всякий случай делается проверка того, что дедлайн еще впереди, дабы предотвратить ситуацию, когда дедлайн уже прошел, но мы этого не заметили и получили отрицательный результат (evt->m_deadline-now).

Для инициации отложенного сигнала используется еще один вариант шаблонной функции send() под названием send_delayed_to_agent(). Эта функция делает отсылку сообщения на персональный mbox указанного ей агента, но не сиюминутную отсылку, а отложенную на указанное количество секунд. SObjectizer поддерживает отложенные и периодические сообщения. Причем SObjectizer может обслуживать десятки и сотни миллионов таких сообщений. Так что создание отдельного таймера для каждого сохраненного во очереди запроса -- это вполне нормальный подход.

В данном примере отслеживание дедлайнов выполняется по самой простой схеме. Для каждой сохраненной в очереди заявки выставляется таймер. Когда таймер срабатывает, агент проверяет первую заявку в очереди на предмет наступления дедлайна. Если дедлайн для заявки наступил (т.е. текущее время оказалось больше времени дедлайна), то заявка из очереди выбрасывается. После чего проверяется новая первая заявка в очереди и т.д. Все это можно увидеть непосредственно в коде обработчика таймерного события:

void
evt_check_deadline()
{
   const std::time_t now = std::time(nullptr);

   while( !m_pending_requests.empty() )
   {
      auto & request = m_pending_requests.top();
      if( now >= request->m_deadline )
      {
         send_negative_reply( *request );
         m_pending_requests.pop();
      }
      else
         break;
   }
}

Когда агент-performer завершает обработку заявки, он присылает сигнал msg_select_next_job. При обработке этого сигнала нужно либо отослать performer-у очередное сообщение, либо вернутся в состояние st_performer_is_free, если заявок для обработки больше нет.

void
evt_select_next_job()
{
   if( m_pending_requests.empty() )
      // Поскольку ожидающих своей очереди запросов нет,
      // то остается просто вернуться в начальное состояние.
      this >>= st_performer_is_free;
   else
   {
      // Нужно оставаться в состоянии st_performer_is_busy
      // и отосылать агенту-performer-у следующую заявку
      // из очереди.

      auto & request = m_pending_requests.top();
      m_performer_mbox->deliver_message( request );
      m_pending_requests.pop();
   }
}

Агент-performer самый простой. Его код можно привести сразу целиком:

class a_performer_t : public so_5::rt::agent_t
{
public :
   a_performer_t(
      so_5::rt::environment_t & env,
      const so_5::rt::mbox_t & collector_mbox )
      :  so_5::rt::agent_t( env )
      ,  m_collector_mbox( collector_mbox )
   {}

   virtual void
   so_define_agent() override
   {
      so_default_state().event( &a_performer_t::evt_request );
   }

private :
   const so_5::rt::mbox_t m_collector_mbox;

   void
   evt_request( const msg_request & evt )
   {
      const std::time_t started_at = std::time(nullptr);

      // Имитация какой-то напряженной работы.
      // Текущая рабочая нить будет заблокирована на 4 секунды.
      std::this_thread::sleep_for( std::chrono::seconds(4) );

      // Ответ на запрос отсылается тому, кто инициировал запрос.
      so_5::send< msg_positive_reply >(
            evt.m_reply_to,
            evt.m_id,
            "-=<" + evt.m_id + ">=-",
            started_at );

      // Агент-collector должен узнать о нашей готовности к обработке
      // следующей заявки.
      so_5::send< a_collector_t::msg_select_next_job >( m_collector_mbox );
   }
};

В реализации performer-а можно отметить один момент, который будет важен при обсуждении диспетчеров чуть ниже: блокирование рабочей нити, на которой запускается evt_request посредством выполнения sleep_for. Это означает, что пока агент-performer спит на этом вызове, рабочая нить не может использоваться для выполнения других операций.

Пожалуй, это все, что касается самих агентов из примера. Остается рассмотреть еще два момента. Во-первых, как запускается SObjectizer и, во-вторых, как создаются и регистрируются описанные выше агенты. Ну и несколько связанных с этим вопросов.

SObjectizer запускается посредством функции so_5::launch(). В SObjectizer-е есть несколько ее вариантов, в примере используется вариант, получающий два аргумента. Первый аргумент -- это функция за запуска стартовой кооперации. Второй аргумент -- это функция для настройки параматров SObjectizer-а. Вот как запуск SObjectizer выглядит в main-е:

int
main( int argc, char ** argv )
{
   try
   {
      so_5::launch(
         create_coop,
         []( so_5::rt::environment_params_t & params )
         {
            params.add_named_dispatcher( "thread_pool",
                  so_5::disp::thread_pool::create_disp( 3 ) );
         } );

      return 0;
   }
   catchconst std::exception & x )
   {
      std::cerr << "*** Exception caught: " << x.what() << std::endl;
   }

   return 2;
}

Информирование об ошибках в SObjectizer базируется на исключениях, поэтому обращение к launch() обрамлено блоком try-catch.

Первым аргументом в launch() передается функция create_coop(), которая будет вызвана SObjectizer-ом в нужный момент. Подробнее create_coop() рассматривается ниже. Вторым аргументом в launch() передается лямбда-функция, осуществляющая настройку конкретного экземпляра SObjectizer. Возврат из launch() осуществляется когда запущенный внутри launch() экземпляр SObjectizer-а завершит свою работу.

В приложении можно создать и одновременно запустить множество экземпляров SObjectizer-а. Каждый из них может иметь разные настройки, разный набор коопераций и агентов. Экземпляр SObjectizer-а -- это объект, реализующий интерфейс environment_t. Такой объект создается внутри launch() и далее ссылка на него будет передаваться в стартовую функцию, а оттуда во все создаваемые пользователем агенты. Каждый агент должен быть привязан к одному environment-у, именно поэтому в конструкторах всех агентов примера первый аргумент -- это ссылка на environment.

Единственная настройка параметров SObjectizer-а в данном примере -- это создание экземпляра thread_pool-диспетчера. Диспетчеры в SObjectizer управляют рабочими нитями, т.е. диспетчер отвечает за то, на какой конкретно нити конкретный агент будет обрабатывать свои события.

SObjectizer предоставляет несколько готовых типов диспетчеров. В данном примере используется диспетчер, который создает пул рабочих нитей и распределяет агентов по ним. Поскольку в примере используется три агента, причем агент-performer выполняет длительные операции, блокирующие рабочую нить, thread_pool-диспетчеру предписывается создать три рабочие нити. Это позволит иметь две свободные нити для событий агентов generator и collector пока агент-performer спит на третьей рабочей нити.

Теперь можно перейти к т.н. "стартовой функции", которая в данном примере представлена функцией create_coop.

Смысл наличия стартовой функции в том, что при запуске SObjectizer-у нужно выполнить ряд действий по инициализации своих внутренних ресурсов (например, создание и запуск диспетчера по-умолчанию и таймерной нити). И только после завершения этих действий можно дать пользователю возможность создавать своих агентов. Как раз за это и отвечает стартовая функция -- пользователь оформляет запуск необходимых ему агентов в виде функции, передает ее SObjectizer-у, а SObjectizer в нужный момент ее вызывает.

Пришло время посмотреть, что из себя представляет стартовая функция в этом примере:

void
create_coop( so_5::rt::environment_t & env )
{
   using namespace so_5::disp::thread_pool;

   auto c = env.create_coop( so_5::autoname,
         create_disp_binder( "thread_pool",
            []( params_t & p ) { p.fifo( fifo_t::individual ); } ) );

   std::unique_ptr< a_collector_t > collector{ new a_collector_t{ env } };
   std::unique_ptr< a_performer_t > performer{
         new a_performer_t{ env, collector->so_direct_mbox() } };
   collector->set_performer_mbox( performer->so_direct_mbox() );

   std::unique_ptr< a_generator_t > generator{
         new a_generator_t{ env, collector->so_direct_mbox() } };

   c->add_agent( std::move( collector ) );
   c->add_agent( std::move( performer ) );
   c->add_agent( std::move( generator ) );

   env.register_coop( std::move( c ) );
}

Ничего особенного, обычные рутинные операции.

Сначала создается экземпляр кооперации, в которую войдут агенты примера:

   auto c = env.create_coop( so_5::autoname,
         create_disp_binder( "thread_pool",
            []( params_t & p ) { p.fifo( fifo_t::individual ); } ) );

Каждая коперация в SObjectizer должна иметь уникальное имя. Это имя можно придумать самому, а можно попросить SObjectizer сделать это вместо нас. Здесь мы как раз просим об этом SObjectizer -- первый аргумент so_5::autoname -- это как раз такая просьба.

А вот второй аргумент для create_coop() несколько сложнее. Есть в SObjectizer такое понятие, как dispatcher binding. Т.е. связь агента с диспетчером, который будет обслуживать агента, т.е. предоставлять рабочие нити для агента.

Каждый агент должен быть связан с каким-то диспетчером. Назначать эту связь можно разными способами. Можно вообще не указывать явно никакой связи. Тогда SObjectizer сам свяжет агента с диспетчером по-умолчанию (все такие агенты будут работать на одной общей рабочей нити). Можно явным образом задавать dispatcher binding для каждого агента. Но, если все агенты кооперации должны работать на одном и том же диспетчере, то явное указание связи для каждого агента кооперации -- это утомительно. Поэтому можно задать связь для кооперации. И эта связь распространится на агентов кооперации.

Именно этот подход и используется здесь. Т.к. все агенты должны работать на thread_pool-диспетчере, то связь с этим диспетчером задается при создании кооперации. А все агенты, которые затем добавляются в кооперацию, будут использовать эту связь.

Небольшая техническая подробность, связанная с вызовом create_disp_binder: в обсуждаемом примере для каждого агента кооперации должна поддерживаться собственная очередь сообщений, что позволит диспетчеру распределять агентов по разным нитям из своего пула. Поэтому при создании binding-а это явным образом указывается при задании параметра fifo в виде значения fifo_t::individual.

Ну а последующие действия в create_coop тривиальны: создаются агенты, затем они добавляются в кооперацию, после чего кооперация регистрируется. Вот и все. Остальное уже дело SObjectizer-а.

Отметить можно разве лишь то, что агенты должны быть динамически созданными объектами, время жизни которых будет контролироваться SObjectizer-ом (SObjectizer сам определяет, когда агентов можно безопасно уничтожать после полной дерегистрации кооперации, которой они принадлежат). И это единственное место в обсуждаемом коде, где пользователь вынужден вручную вызвать оператор new. Да и то только потому, что в C++11 нет вспомогательной функции std::make_unique, которую добавили в C++14. Так что после перевода SObjectizer на C++14 необходимости в ручном вызове new будет еще меньше.

Почему внимание было заострено на количестве вызовов new в данном примере? Потому, что управление динамической памятью -- это одна из самых больших проблем в C++. Еще острее эта проблема становится в многопоточных приложениях. А вот в чем большой плюс SObjectizer-а, так это в том, что очень многие вопросы по работе с созданными динамически объектами, вроде агентов и сообщений, SObjectizer берет на себя, тем самым существенно упрощая жизнь пользователю.


Пожалуй, на более низкий уровень детализации опускаться сейчас нет смысла. Относительно небольшой пример продемонстрировал использование большинства ключевых особенностей SObjectizer: диспетчер, кооперация, агент с состояниями, сообщения и сигналы, перепосылка сообщений и отложенные сообщения. Это базовые вещи, кирпичики, из которых строятся гораздо более сложные и большие SObjectizer-приложения.

Само же решение проблемы контроля дедлайнов в данном примере очень простое. В реальном приложении наверняка пришлось бы делать более сложную реализацию. Например, с отменой ставших не нужными отложенными сигналами msg_check_deadline при передаче запроса из очереди агенту-performer-у. И, наверняка, в реальном прикладном коде приоритетная очередь запросов учитывала бы больше факторов для правильного упорядочения отложенных заявок. Но все эти вещи уже выходят за рамки простого примера.

Исходный код примера находится сейчас в репозитории SObjectizer-а в рабочей ветке. Для его работы необходима версия 5.5.2, которую можно загрузить с SourceForge. В официальный релиз пример simple_message_deadline будет включен при выпуске версии 5.5.3 (или 5.6.0), ориентировочно в феврале-марте 2015.

Отправить комментарий