В продолжение темы, начатой заметками "Реализация дедлайнов для сообщений своими руками" и "Агент-collector + агент-performer = паттерн для нескольких ситуаций", в SObjectizer добавлен еще один пример simple_message_deadline. Оказалось, что в данном примере задействуется много ключевых возможностей SObjectizer-а, поэтому имеет смысл разобрать пример подробнее, объясняя, по ходу рассказа, что к чему и почему. Рассказ я постарался сделать максимально простым и подробным, ориентированным на читателя, который либо вообще не имеет представления о SObjectizer-е, либо имеет самое поверхностное.
Что делает пример? Один агент-generator инициирует запросы, у каждого из которых есть дедлайн. Т.е. для запроса задано время, до наступления которого обработка запроса должна быть начата. Если этого не случилось, то запрос должен быть отвергнут.
Агент-collector, который собирает запросы от агента-generator-а в очередь и по одному отдает их агенту-performer-у. Агент-performer имитирует обработку запросов, затрачивая по четыре секунды на каждый полученный им запрос. За то время, пока агент-performer "занимается" обработкой, у части сохраненных агентом-collector-ом запросов может наступить дедлайн. Это контролируется агентом-collector-ом, который выбрасывает просроченные запросы из очереди.
Вот такая незамысловатая логика работы. Теперь можно перейти к разбору того, как это реализовано в коде.
Сначала нужно описать прикладные сообщения, которые будут ходить между агентами. В SObjectizer каждое сообщение должно иметь свой собственный уникальный тип, т.к. диспетчеризация и обработка сообщений происходит по их типу.
Первое сообщение -- это запрос, который идет от generator-а к collector-у, а затем и к performer-у:
// Запрос, который будет отсылаться генератором. struct msg_request : public so_5::rt::message_t { std::string m_id; std::time_t m_deadline; const so_5::rt::mbox_t m_reply_to; msg_request( std::string id, std::time_t deadline, const so_5::rt::mbox_t & reply_to ) : m_id( std::move( id ) ) , m_deadline( deadline ) , m_reply_to( reply_to ) {} }; |
Сообщение содержит минимальное количество полей, необходимых для работы примера. Некий идентификатор, который поможет сопоставлять запросы и ответы. Дедлайн, до наступления которого запрос должен поступить на обработку. И обратный адрес, на который должен быть отослан ответ на запрос (в SObjectizer сообщения отсылаются в "почтовые ящики", называемые mbox-ами, а ссылки на эти почтовые ящики хранятся в виде объектов mbox_t).
Далее идет определение удобного псевдонима, который позволит в некоторых местах писать более лаконично:
// Удобный псевдоним умного указателя на экземпляр запроса. using msg_request_smart_ptr_t = so_5::intrusive_ptr_t< msg_request >; |
Нужен этот псевдоним вот для чего. Сообщения в SObjectizer отсылаются в виде динамически созданных объектов. Время жизни экземпляра сообщения контролируется SObjectizer-ом посредством использования интрузивного умного указателя intrusive_ptr_t. Экземпляр сообщения автоматически уничтожается после того, как исчезнут все ссылки на него. Т.е. когда сообщение будет обработано всеми подписчиками.
В данном примере агент-collector получив сообщение msg_request от generator-а должен сохранить экземпляр сообщения у себя на какое-то время. Сделать это можно либо просто скопировав сообщение (т.е. создав новый экземпляр), либо же сохранив у себя ссылку на уже имеющийся экземпляр посредством умного указателя. Второй подход более эффективен, т.к. не требует повторного выделения памяти под msg_request. Именно поэтому он используется агентом-collector-ом. А чтобы не писать intrusive_ptr_t<msg_request> вводится более удобное имя msg_request_smart_ptr_t.
Далее необходимы ответные сообщения, которые будут сообщать generator-у результат обработки запроса. Таких сообщений два. Одно используется для информирования об успешной обработке запроса. Второе -- о неудачной.
// Результат успешной обработки запроса. struct msg_positive_reply : public so_5::rt::message_t { std::string m_id; std::string m_result; std::time_t m_started_at; msg_positive_reply( std::string id, std::string result, std::time_t started_at ) : m_id( std::move( id ) ) , m_result( std::move( result ) ) , m_started_at( started_at ) {} }; // Уведомление о том, что запрос обработан не был. struct msg_negative_reply : public so_5::rt::message_t { std::string m_id; std::time_t m_deadline; msg_negative_reply( std::string id, std::time_t deadline ) : m_id( std::move( id ) ) , m_deadline( deadline ) {} }; |
В сообшении msg_positive_reply поле m_started_at указывает время, когда сообщение поступило на обработку. В сообщении msg_negative_reply поле m_deadline -- это копия значения поля m_deadline из msg_request. Оно нужно лишь для того, чтобы проще было убедиться в том, что дедлайн для сообщения действительно наступил.
То, что все показанные выше сообщения являются наследниками so_5::rt::message_t пока оставим без объяснений, а чуть ниже разберем этот вопрос чуть подробнее, когда появится использование еще одного базового типа so_5::rt::signal_t.
После объявления всех необходимых прикладных сообщений можно определять агента-generator-а. В данном примере у него тривиальная логика поведения: при старте он отсылает агенту-collector-у серию сообщений msg_request с разными дедлайнами, а затем ждет ответов на свои вопросы. Когда все ответы получены, работа примера завершается.
Для отсылки запросов collector-у нужно знать mbox collector-а. Этот mbox передается генератору в конструктор и сохраняется в виде атрибута агента-generator-а:
class a_generator_t : public so_5::rt::agent_t { public : a_generator_t( so_5::rt::environment_t & env, const so_5::rt::mbox_t & processor_mbox ) : so_5::rt::agent_t( env ) , m_processor_mbox( processor_mbox ) {} |
Название processor_mbox, а не, скажем, collector_mbox, было выбрано потому, что агент-generator не знает, что за обработку его запросов будет отвечать пара агентов (collector+performer). Для генератора есть только mbox, на который нужно отослать запрос. Скрывается ли за этим mbox-ом один обработчик, два, три или больше -- уже неважно. Поэтому агент-generator ситает, что есть некий processor для его запросов, mbox которого и передается агенту-generator-у в конструкторе.
Для того, чтобы получать ответные сообщения агент-generator должен подписаться на них. У агентов в SObjectizer-е есть специальный метод so_define_agent(), который предназначен для выполнения подписки на интересующие агента сообщения. Агент-generator подписывает два своих события (можно сказать, что событие -- это метод-обработчик, который вызывается у агента для реакции на сообщение):
virtual void so_define_agent() override { so_default_state() .event( &a_generator_t::evt_positive_reply ) .event( &a_generator_t::evt_negative_reply ); } |
Агент-generator обладает очень простой логикой, поэтому ему не нужно менять свое состояние и оба своих события агент подписывает для состояния по-умолчанию, которое есть у всех SObjectizer-овских агентов. Метод so_default_state() возвращает ссылку на соответствующий объект-состояние, а методы event() этого объекта выполняют подписку событий, реализованных методами evt_positive_reply() и evt_negative_reply().
В SObjectizer существует несколько вариантов метода event() для подписки событий агента. В данном случае используется самый минималистичный и простой вариант -- события агента подписываются на сообщения, которые адресованы персонально этому агенту-generator-у. Тип сообщения, которое должно приводить к возникновению события, определяется автоматически на основании сигнатуры метода-события. В данном случае сигнатуры следующие:
void evt_positive_reply( const msg_positive_reply & evt ); void evt_negative_reply( const msg_negative_reply & evt ); |
Что позволяет SObjectizer-у самостоятельно определить, что событие evt_positive_reply должно возникать когда агенту присылают сообщение msg_positive_reply, а событие evt_negative_reply -- когда присылают сообщение msg_negative_reply.
Для упрощения примера все заявки агент-generator рассылает в начале своей работы. У агентов в SObjectizer-е есть метод so_evt_start(), который SObjectizer автоматически вызывает при запуске агента. Агент-generator использует этот метод для того, чтобы отослать серию запросов на processor_mbox:
virtual void so_evt_start() override { // Информация для вычисления дедлайнов отсылаемых сообщений. unsigned int delays[] = { 1, 4, 5, 3, 9, 15, 12 }; const std::time_t now = std::time(nullptr); int i = 0; for( auto d : delays ) { // Создаем идентификатор сообщения. std::ostringstream idstream; idstream << "i=" << i << ";d=" << d; const std::string id = idstream.str(); // Вычисляем дедлайн сообщения. const std::time_t deadline = now + d; // Отсылаем запрос на обработку. so_5::send< msg_request >( m_processor_mbox, id, deadline, so_direct_mbox() ); std::cout << "sent: [" << id << "], deadline: " << time_to_string( deadline ) << std::endl; // Количество отосланных запросов увеличилось, // должно увеличится количество ожидаемых ответов. ++m_expected_replies; ++i; } } |
Внутри метода so_evt_start() к SObjectizer-у имеет отношение только шаблонная функция so_5::send. Она используется для конструирования и отсылки сообщения типа msg_request. Первым параметром для send-а является mbox, на который сообщение должно быть отослано. Остальные параметры передаются в конструктор нового экземпляра msg_request. Этот экземпляр будет сконструирован внутри send, пользователю не нужно делать этого вручную.
Последним параметром конструктора msg_request (и, соответственно, последним аргументом при вызове send) является mbox, на который должен придти ответ. В качестве этого mbox-а указывается персональный mbox агента-generator-а. Каждый агент имеет свой личный mbox, доступ к этому mbox-у осуществляется посредством метода so_direct_mbox(). Название direct_mbox используется потому, что в SObjectizer есть два типа mbox-ов. Самый простой из них реализует модель multi-producer/single-consumer, т.е. сообщение в mbox может отослать кто угодно, а вот получить сообщение из mbox-а может только единственный агент-владелец. Таким образом mbox подобного типа является прямым каналом для связи с агентом, отсюда и название direct_mbox. Есть так же и тип multi-producer/multi-consumer mbox-а, но в этом примере он не используется.
Итак, агент-generator в so_evt_start() отсылает серию запросов после чего SObjectizer будет вызывать события агента-generator-а при поступлении ответных сообщений. В обработчиках ответов агент-generator не делает ничего особенного, просто печатает на консоль то, что к нему поступает. Внимание можно обратить разве что на подсчет количества полученных ответов и на действие, которое агент предпринимает, когда все ответы получены:
void count_reply() { --m_expected_replies; if( !m_expected_replies ) so_deregister_agent_coop_normally(); } |
Когда все ответы до агента дошли, агент инициирует дерегистрацию кооперации, в которую он входит. Кооперация -- это совокупность агентов, которые совместно выполняют какую-то работу. В данном случае в кооперацию входит три агента: generator, collector и performer. Для данного примера нужны все три агента, взаимодействующие друг с другом, каждый по отдельности не имеет особого смысла. Следовательно, в начале работы все три агента должны одновременно появиться, а когда действия примера завершаются, все три агента должны исчезнуть.
Именно поэтому агенты объеденены в кооперацию: они вместе появляются внутри SObjectizer-а, вместе и покидают SObjectizer. Вызов метода so_deregister_agent_coop_normally() как раз дает команду изъять кооперацию и удалить входящих в нее агентов. При дерегистрации кооперации нужно указать причину -- выполняется ли дерегистрация в соответствии с нормальной логикой работы или же из-за ошибки. В данном случае дерегистрация -- это нормальное действие, поэтому используется метод so_deregister_agent_coop_normally().
В данном примере регистрируется всего одна кооперация. Когда она дерегистрируется, SObjectizer понимает, что работающих агентов больше не осталось, и завершает свою работу.
Самый сложный агент в данном примере -- это агент-collector. Перед тем, как рассматривать его код, имеет смысл объяснить логику его работы.
Агент-collector получает запросы от generator-а и переадресует их агенту-performer-у. Т.к. запросы могут приходить быстрее, чем performer будет их обрабатывать, агенту-collector-у приходится сохранять запросы у себя. Для чего используется приоритетная очередь. Запросы с наиболее близким дедлайном получают наивысший приоритет и оказываются в начале очереди.
Запросы пересылаются агенту-performer-у. Когда performer завершает обработку очередного запроса, он информирует агента-collector-а об этом посредством уведомления msg_select_next_job. Когда агент-collector получает msg_select_next_job, он берет первую заявку из очереди и пересылает ее агенту-performer-у.
У каждого сообщения есть дедлайн и агент-collector должен контролировать дедлайны. Для этого при сохранении сообщения в очереди агент-collector взводит таймер. При срабатывании таймера collector получит сигнал и сможет выбросить из очереди сообщение, дедлайн которого натупил (если оно еще есть в очереди).
При получении msg_request агент-collector может оказаться в двух ситуациях:
- агент-performer свободен и новый msg_request может быть сразу отослан на обработку;
- агент-performer занят обработкой ранее переданного ему запроса и новый msg_request должен быть сохранен в очереди.
В коде агента-collector-а эти две ситуации выражаются явно путем объявления двух состояний для агента:
const so_5::rt::state_t st_performer_is_free = so_make_state(); const so_5::rt::state_t st_performer_is_busy = so_make_state(); |
Состояние определяет, какие сообщения посредством каких событий будут обрабатываться, когда агент находится в этом состоянии. Это показывается в методе so_define_agent() агента-collector-а при подписке агента на сообщения:
virtual void so_define_agent() override { this >>= st_performer_is_free; st_performer_is_free .event( &a_collector_t::evt_first_request ); st_performer_is_busy .event( &a_collector_t::evt_yet_another_request ) .event< msg_select_next_job >( &a_collector_t::evt_select_next_job ) .event< msg_check_deadline >( &a_collector_t::evt_check_deadline ); } |
Здесь видно, что в состоянии st_perfomer_is_free агент реагирует только на одно событие -- получение нового msg_request. Обработка этого события выполняется методом evt_first_request.
А вот в состоянии st_performer_is_busy агенту нужно обрабатывать больше событий. Событие evt_yet_another_request отвечает за обработку очередного msg_request. Событие evt_select_next_job -- за выдачу первого msg_request из внутренней очереди освободившемуся агенту-performer-у. Событие evt_check_deadline -- за проверку наступления дедлайнов для хранящихся во внутренней очереди запросов.
Почему в состоянии st_performer_is_busy выполняется больше действий, чем в состоянии st_performer_is_free? Потому, что когда агент-performer свободен, очередь заявок в collector-е пуста из-за чего нет надобности реагировать на просьбу выдать следующий запрос (очередь пуста, выдавать нечего). Как и нет надобности проверять дедлайны -- очередь пуста, в ней нет заявок. Поэтому msg_select_next_job и msg_check_deadline в состоянии st_performer_is_free просто игнорируются.
Первой же операцией в so_define_agent() является перевод агента-collector-а в состояние st_performer_is_free. Это обязательно нужно сделать, т.к. изначально агент находится в состоянии по-умолчанию. И, если агента не перевести из него в состояние st_performer_is_free, то агент-collector не будет работать, т.к. подписки сделаны для st_performer_is_free и st_performer_is_busy, а не для состояния по-умолчанию.
Глядя на код so_define_agent() можно увидеть, что в нем используются две формы методов event() для подписки событий. В одном случае event() получает только указатель на метод-обработчик события. Этот случай уже рассматривался выше для агента-generator-а. Во втором случае параметром шаблона задается еще и имя типа сообщения. Этот вариант event() нужен для случая, когда по сигнатуре метода-обработчика нельзя определить, какое сообщение приводит к возникновению события. Действительно, методы evt_select_next_job() и evt_check_deadline() не имеют аргументов вообще:
void evt_select_next_job(); void evt_check_deadline(); |
Все дело в том, что в SObjectizer-е различаются два типа сообщений. Первый тип -- это собственно полноценные сообщения, которые переносят в себе информацию. Примерами являются сообщения msg_request, msg_positive_reply и msg_negative_reply. Методы-обработчики таких сообщений должны получать экземпляр сообщения в качестве параметра, иначе смысла в обработчике нет. Поэтому при подписке обработчика полноценного сообщения в методе event() нет необходимости указывать тип сообщения явным образом -- он выводится автоматически.
Второй тип -- это сигналы. Т.е. это сообщения, которые не переносят никакой дополнительной информации кроме факта собственного существования. Пример -- это уведомление msg_select_next_job от агента-performer-а. Важен сам факт того, что агент закончил с текущей заявкой и готов перейти к следующей. Поэтому больше никакой информации с msg_select_next_job не связано.
Внутри SObjectizer-а механизмы доставки сообщений и сигналов несколько отличаются друг от друга. Сообщение обязательно должно быть представлено каким-то объектом (экземпляром), ссылка на который передается в событие. А вот для сигнала никаких экземпляров нет, есть только информация о сигнале конкретного типа. Поэтому в метод-обработчик передавать нечего. Отсюда и формат обработчиков сигналов -- метод без параметров. А раз метод-обработчик не имеет параметров, то вывести тип сигнала-инцидента нет возможности и пользователю нужно указывать тип сигнала явно:
st_performer_is_busy .event( &a_collector_t::evt_yet_another_request ) .event< msg_select_next_job >( &a_collector_t::evt_select_next_job ) .event< msg_check_deadline >( &a_collector_t::evt_check_deadline ); |
Различия между сообщениями и сигналами еще и в том, что тип для сообщения с данными должен быть наследником so_5::rt::message_t. А тип для сигнала должен наследоваться от so_5::rt::signal_t. Что можно видеть в коде агента-collector-а:
struct msg_select_next_job : public so_5::rt::signal_t {}; struct msg_check_deadline : public so_5::rt::signal_t {}; |
Такое требование с наследованием либо от message_t, либо от signal_t, позволяет SObjectizer-у контролировать корректность действий пользователя. Так, если тип сообщения отнаследован от message_t, то SObjectizer будет гарантировать, что в обработчик сообщения будет передана корректная ссылка и что пользователь не сможет отослать в качестве сообщения нулевой указатель.
Если посмотреть на сигнатуры методов-обработчиков в агенте-generator-е и агенте-collector-е, то можно заметить, что они существенно отличаются. В первом случае обработчик получает ссылку на экземпляр сообщения:
void evt_positive_reply( const msg_positive_reply & evt ); void evt_negative_reply( const msg_negative_reply & evt ); |
Тогда как во втором случае -- ссылку на вспомогательный SObjectizer-овский объект event_data_t:
void
evt_first_request( const so_5::rt::event_data_t< msg_request > & evt ) void evt_yet_another_request( const so_5::rt::event_data_t< msg_request > & evt ) |
Первая форма является наиболее общеупотребительной и удобной -- обработчик события просто получает ссылку на экземпляр сообщения-инцидента. Вторая форма используется, в основном, в случаях, когда нужно иметь возможность сохранить указатель на экземпляр сообщения-инцидента. Именно это и нужно агенту-collector-у. Для того, чтобы не пересоздавать сообщение msg_request заново, агент-collector использует тот же самый экземпляр сообщения (при перепосылке его агенту-performer-у и при сохранении msg_request-а в своей очереди). Для получения умного указателя на экземпляр сообщения-инцидента используется метод make_reference() класса event_data_t.
Реакция на msg_request в состоянии st_performer_is_free (т.е. когда агент-perfomer не занят обработкой запросов) тривиальна -- нужно просто сменить состояние на st_performer_is_busy и переслать этот же экземпляр сообщения агенту-performer-у:
void evt_first_request( const so_5::rt::event_data_t< msg_request > & evt ) { this >>= st_performer_is_busy; m_performer_mbox->deliver_message( evt.make_reference() ); } |
Здесь для перепосылки того же самого экземпляра сообщения используется метод deliver_message(), а не функция send(). В текущей версии SObjectizer-а шаблонные функции send() не поддерживают операцию перепосылки, поэтому задействуется метод mbox-а. Вообще-то шаблонные функции send() -- это всего лишь простые вспомогательные обертки, которые конструируют новый экземпляр сообщения, а затем передают его в mbox-овский deliver_message(). Изначально функций send() вообще не было, но со временем они были добавлены для упрощения жизни разработчикам, в особенности при написании кода внутри C++ных шаблонов.
Реакция на msg_request в состоянии st_performer_is_busy поинтереснее:
void evt_yet_another_request( const so_5::rt::event_data_t< msg_request > & evt ) { // Агент-performer занят, поэтому запрос должен быть // сохранен в очереди, а дедлайн запроса нужно проконтролировать. const std::time_t now = std::time(nullptr); if( now < evt->m_deadline ) { m_pending_requests.push( evt.make_reference() ); // Для простоты на каждый сохраненный в очереди запрос // генерируем таймерный сигнал. so_5::send_delayed_to_agent< msg_check_deadline >( *this, std::chrono::seconds( evt->m_deadline - now ) ); } else { // Дедлайн для сообщения наступил пока сообщение шло к нам. // Поэтому вместо обработки сразу же отсылается // отрицательный ответ. send_negative_reply( *evt ); } } |
Новый запрос сохраняется в очереди. И сразу же агент-collector отсылает себе отложенное сообщение-сигнал для контроля дедлайна сообщения. На всякий случай делается проверка того, что дедлайн еще впереди, дабы предотвратить ситуацию, когда дедлайн уже прошел, но мы этого не заметили и получили отрицательный результат (evt->m_deadline-now).
Для инициации отложенного сигнала используется еще один вариант шаблонной функции send() под названием send_delayed_to_agent(). Эта функция делает отсылку сообщения на персональный mbox указанного ей агента, но не сиюминутную отсылку, а отложенную на указанное количество секунд. SObjectizer поддерживает отложенные и периодические сообщения. Причем SObjectizer может обслуживать десятки и сотни миллионов таких сообщений. Так что создание отдельного таймера для каждого сохраненного во очереди запроса -- это вполне нормальный подход.
В данном примере отслеживание дедлайнов выполняется по самой простой схеме. Для каждой сохраненной в очереди заявки выставляется таймер. Когда таймер срабатывает, агент проверяет первую заявку в очереди на предмет наступления дедлайна. Если дедлайн для заявки наступил (т.е. текущее время оказалось больше времени дедлайна), то заявка из очереди выбрасывается. После чего проверяется новая первая заявка в очереди и т.д. Все это можно увидеть непосредственно в коде обработчика таймерного события:
void evt_check_deadline() { const std::time_t now = std::time(nullptr); while( !m_pending_requests.empty() ) { auto & request = m_pending_requests.top(); if( now >= request->m_deadline ) { send_negative_reply( *request ); m_pending_requests.pop(); } else break; } } |
Когда агент-performer завершает обработку заявки, он присылает сигнал msg_select_next_job. При обработке этого сигнала нужно либо отослать performer-у очередное сообщение, либо вернутся в состояние st_performer_is_free, если заявок для обработки больше нет.
void evt_select_next_job() { if( m_pending_requests.empty() ) // Поскольку ожидающих своей очереди запросов нет, // то остается просто вернуться в начальное состояние. this >>= st_performer_is_free; else { // Нужно оставаться в состоянии st_performer_is_busy // и отосылать агенту-performer-у следующую заявку // из очереди. auto & request = m_pending_requests.top(); m_performer_mbox->deliver_message( request ); m_pending_requests.pop(); } } |
Агент-performer самый простой. Его код можно привести сразу целиком:
class a_performer_t : public so_5::rt::agent_t { public : a_performer_t( so_5::rt::environment_t & env, const so_5::rt::mbox_t & collector_mbox ) : so_5::rt::agent_t( env ) , m_collector_mbox( collector_mbox ) {} virtual void so_define_agent() override { so_default_state().event( &a_performer_t::evt_request ); } private : const so_5::rt::mbox_t m_collector_mbox; void evt_request( const msg_request & evt ) { const std::time_t started_at = std::time(nullptr); // Имитация какой-то напряженной работы. // Текущая рабочая нить будет заблокирована на 4 секунды. std::this_thread::sleep_for( std::chrono::seconds(4) ); // Ответ на запрос отсылается тому, кто инициировал запрос. so_5::send< msg_positive_reply >( evt.m_reply_to, evt.m_id, "-=<" + evt.m_id + ">=-", started_at ); // Агент-collector должен узнать о нашей готовности к обработке // следующей заявки. so_5::send< a_collector_t::msg_select_next_job >( m_collector_mbox ); } }; |
В реализации performer-а можно отметить один момент, который будет важен при обсуждении диспетчеров чуть ниже: блокирование рабочей нити, на которой запускается evt_request посредством выполнения sleep_for. Это означает, что пока агент-performer спит на этом вызове, рабочая нить не может использоваться для выполнения других операций.
Пожалуй, это все, что касается самих агентов из примера. Остается рассмотреть еще два момента. Во-первых, как запускается SObjectizer и, во-вторых, как создаются и регистрируются описанные выше агенты. Ну и несколько связанных с этим вопросов.
SObjectizer запускается посредством функции so_5::launch(). В SObjectizer-е есть несколько ее вариантов, в примере используется вариант, получающий два аргумента. Первый аргумент -- это функция за запуска стартовой кооперации. Второй аргумент -- это функция для настройки параматров SObjectizer-а. Вот как запуск SObjectizer выглядит в main-е:
int main( int argc, char ** argv ) { try { so_5::launch( create_coop, []( so_5::rt::environment_params_t & params ) { params.add_named_dispatcher( "thread_pool", so_5::disp::thread_pool::create_disp( 3 ) ); } ); return 0; } catch( const std::exception & x ) { std::cerr << "*** Exception caught: " << x.what() << std::endl; } return 2; } |
Информирование об ошибках в SObjectizer базируется на исключениях, поэтому обращение к launch() обрамлено блоком try-catch.
Первым аргументом в launch() передается функция create_coop(), которая будет вызвана SObjectizer-ом в нужный момент. Подробнее create_coop() рассматривается ниже. Вторым аргументом в launch() передается лямбда-функция, осуществляющая настройку конкретного экземпляра SObjectizer. Возврат из launch() осуществляется когда запущенный внутри launch() экземпляр SObjectizer-а завершит свою работу.
В приложении можно создать и одновременно запустить множество экземпляров SObjectizer-а. Каждый из них может иметь разные настройки, разный набор коопераций и агентов. Экземпляр SObjectizer-а -- это объект, реализующий интерфейс environment_t. Такой объект создается внутри launch() и далее ссылка на него будет передаваться в стартовую функцию, а оттуда во все создаваемые пользователем агенты. Каждый агент должен быть привязан к одному environment-у, именно поэтому в конструкторах всех агентов примера первый аргумент -- это ссылка на environment.
Единственная настройка параметров SObjectizer-а в данном примере -- это создание экземпляра thread_pool-диспетчера. Диспетчеры в SObjectizer управляют рабочими нитями, т.е. диспетчер отвечает за то, на какой конкретно нити конкретный агент будет обрабатывать свои события.
SObjectizer предоставляет несколько готовых типов диспетчеров. В данном примере используется диспетчер, который создает пул рабочих нитей и распределяет агентов по ним. Поскольку в примере используется три агента, причем агент-performer выполняет длительные операции, блокирующие рабочую нить, thread_pool-диспетчеру предписывается создать три рабочие нити. Это позволит иметь две свободные нити для событий агентов generator и collector пока агент-performer спит на третьей рабочей нити.
Теперь можно перейти к т.н. "стартовой функции", которая в данном примере представлена функцией create_coop.
Смысл наличия стартовой функции в том, что при запуске SObjectizer-у нужно выполнить ряд действий по инициализации своих внутренних ресурсов (например, создание и запуск диспетчера по-умолчанию и таймерной нити). И только после завершения этих действий можно дать пользователю возможность создавать своих агентов. Как раз за это и отвечает стартовая функция -- пользователь оформляет запуск необходимых ему агентов в виде функции, передает ее SObjectizer-у, а SObjectizer в нужный момент ее вызывает.
Пришло время посмотреть, что из себя представляет стартовая функция в этом примере:
void create_coop( so_5::rt::environment_t & env ) { using namespace so_5::disp::thread_pool; auto c = env.create_coop( so_5::autoname, create_disp_binder( "thread_pool", []( params_t & p ) { p.fifo( fifo_t::individual ); } ) ); std::unique_ptr< a_collector_t > collector{ new a_collector_t{ env } }; std::unique_ptr< a_performer_t > performer{ new a_performer_t{ env, collector->so_direct_mbox() } }; collector->set_performer_mbox( performer->so_direct_mbox() ); std::unique_ptr< a_generator_t > generator{ new a_generator_t{ env, collector->so_direct_mbox() } }; c->add_agent( std::move( collector ) ); c->add_agent( std::move( performer ) ); c->add_agent( std::move( generator ) ); env.register_coop( std::move( c ) ); } |
Ничего особенного, обычные рутинные операции.
Сначала создается экземпляр кооперации, в которую войдут агенты примера:
auto c = env.create_coop( so_5::autoname, create_disp_binder( "thread_pool", []( params_t & p ) { p.fifo( fifo_t::individual ); } ) ); |
Каждая коперация в SObjectizer должна иметь уникальное имя. Это имя можно придумать самому, а можно попросить SObjectizer сделать это вместо нас. Здесь мы как раз просим об этом SObjectizer -- первый аргумент so_5::autoname -- это как раз такая просьба.
А вот второй аргумент для create_coop() несколько сложнее. Есть в SObjectizer такое понятие, как dispatcher binding. Т.е. связь агента с диспетчером, который будет обслуживать агента, т.е. предоставлять рабочие нити для агента.
Каждый агент должен быть связан с каким-то диспетчером. Назначать эту связь можно разными способами. Можно вообще не указывать явно никакой связи. Тогда SObjectizer сам свяжет агента с диспетчером по-умолчанию (все такие агенты будут работать на одной общей рабочей нити). Можно явным образом задавать dispatcher binding для каждого агента. Но, если все агенты кооперации должны работать на одном и том же диспетчере, то явное указание связи для каждого агента кооперации -- это утомительно. Поэтому можно задать связь для кооперации. И эта связь распространится на агентов кооперации.
Именно этот подход и используется здесь. Т.к. все агенты должны работать на thread_pool-диспетчере, то связь с этим диспетчером задается при создании кооперации. А все агенты, которые затем добавляются в кооперацию, будут использовать эту связь.
Небольшая техническая подробность, связанная с вызовом create_disp_binder: в обсуждаемом примере для каждого агента кооперации должна поддерживаться собственная очередь сообщений, что позволит диспетчеру распределять агентов по разным нитям из своего пула. Поэтому при создании binding-а это явным образом указывается при задании параметра fifo в виде значения fifo_t::individual.
Ну а последующие действия в create_coop тривиальны: создаются агенты, затем они добавляются в кооперацию, после чего кооперация регистрируется. Вот и все. Остальное уже дело SObjectizer-а.
Отметить можно разве лишь то, что агенты должны быть динамически созданными объектами, время жизни которых будет контролироваться SObjectizer-ом (SObjectizer сам определяет, когда агентов можно безопасно уничтожать после полной дерегистрации кооперации, которой они принадлежат). И это единственное место в обсуждаемом коде, где пользователь вынужден вручную вызвать оператор new. Да и то только потому, что в C++11 нет вспомогательной функции std::make_unique, которую добавили в C++14. Так что после перевода SObjectizer на C++14 необходимости в ручном вызове new будет еще меньше.
Почему внимание было заострено на количестве вызовов new в данном примере? Потому, что управление динамической памятью -- это одна из самых больших проблем в C++. Еще острее эта проблема становится в многопоточных приложениях. А вот в чем большой плюс SObjectizer-а, так это в том, что очень многие вопросы по работе с созданными динамически объектами, вроде агентов и сообщений, SObjectizer берет на себя, тем самым существенно упрощая жизнь пользователю.
Пожалуй, на более низкий уровень детализации опускаться сейчас нет смысла. Относительно небольшой пример продемонстрировал использование большинства ключевых особенностей SObjectizer: диспетчер, кооперация, агент с состояниями, сообщения и сигналы, перепосылка сообщений и отложенные сообщения. Это базовые вещи, кирпичики, из которых строятся гораздо более сложные и большие SObjectizer-приложения.
Само же решение проблемы контроля дедлайнов в данном примере очень простое. В реальном приложении наверняка пришлось бы делать более сложную реализацию. Например, с отменой ставших не нужными отложенными сигналами msg_check_deadline при передаче запроса из очереди агенту-performer-у. И, наверняка, в реальном прикладном коде приоритетная очередь запросов учитывала бы больше факторов для правильного упорядочения отложенных заявок. Но все эти вещи уже выходят за рамки простого примера.
Исходный код примера находится сейчас в репозитории SObjectizer-а в рабочей ветке. Для его работы необходима версия 5.5.2, которую можно загрузить с SourceForge. В официальный релиз пример simple_message_deadline будет включен при выпуске версии 5.5.3 (или 5.6.0), ориентировочно в феврале-марте 2015.
Комментариев нет:
Отправить комментарий