пятница, 6 июня 2014 г.

[prog.c++] Простая реализация хамелеонов на SObjectizer

В продолжение вот этой темы. Не знаю, когда руки дойдут до создания аналогичного теста на Qt, т.к. Qt мне сейчас нужно изучать, фактически, заново. Но вот реализацию простого теста chameneos для текущего варианта SObjectizer-5 я сделал. Код можно посмотреть в репозитории на SourceForge, или же в конце этого поста. Целью же данной заметки является рассказ и объяснение на пальцах, как тест с хамелеонами реализуется в терминах SObjectizer-овских агентов, mbox-ов и сообщений.

Ниже обсуждается простой вариант теста, который когда-то использовался в The Computer Language Benchmark Game. Сейчас там применяется более усложненная версия, chameneos_redux. В принципе, при наличии на то необходимости и интереса, можно сделать и новую версию. Но я остановился на простой, т.к. когда-то уже делал ее реализацию на нескольких языках. Соответственно, если кто-то не в курсе, что такое тест с хамелеонами, то адресую интересующихся к своей практически уже мемуарной заметке на RSDN. Там описывается и условие задачи, и грабли, на которые легко наступить (причем, не только мне, как оказалось), решая ее с помощью низкоуровневых инструментов вроде мутексов, семафоров и условных переменных. Там же есть код моих решений этой задачи на C++ (с использованием ACE) и D1 (с использованием Tango).

Ну а тем, кому интересно, как такая задача решается на SO-5.2/5.3, добро пожаловать под кат.

Данная реализация использует два типа агентов. Первый тип -- это агент, являющийся местом встречи хамелеонов (a_meeting_place_t). Но, кроме собственно проведения встреч, этот агент решает еще одну задачу -- организацию нормального завершения теста. Дело в том, что SObjectizer, если его не остановить, крутит свои циклы обработки сообщений до бесконечности. Когда приложение заканчивает свою работу, оно должно дать сигнал SObjectizer на остановку SObjectizer RunTime. В данном тесте эту команду дает агент a_meeting_place_t. В принципе, для операции shutdown можно было бы завести отдельного агента, но для простоты я решил не плодить сущности без надобности.

Итак, основная задача агента a_meeting_place_t -- это выполнение встреч хамелеонов. Соответственно, у агента a_meeting_place_t есть два очевидных состояния: место встречи свободно (и тогда любой пришедший первым хамелеон может попасть на него) и на месте встречи уже есть один хамелеон (тогда на место встречи может попасть еще только один хамелеон).

Эти состояния описываются в агенте посредством экземпляров типа so_5::rt::state_t:

   private :
      so_5::rt::state_t st_empty;
      so_5::rt::state_t st_one_creature_inside;

Далее эти объекты встретятся при обработке сообщений агентом a_meeting_place_t. А сообщений этому агенту потребуется получать два.

Первое сообщение (msg_meeting_request) -- это запрос хамелеоном разрешения на вход на место встречи. Каждый хамелеон отсылает это сообщение агенту a_meeting_place и ждет результата. Результатом может быть либо ответ о том, что встреча произошла, либо же уведомление о том, что встречи больше проводить нельзя и нужно завершать работу.

Второе сообщение (msg_shutdown_ack) -- это подтверждение от хамелеонов о том, что они завершили свою работу. Когда все хамелеоны пришлют свои подтверждения, a_meeting_place_t завершит работу SObjectizer-а.

Итак, агент a_meeting_place_t должен обрабатывать два сообщения. Для этого он должен сделать подписку на них. Подписка выполняется в специальном виртуальном методе so_define_agent(). Этот метод вызывается SObjectizer-ом во время регистрации агента внутри SObjectizer RunTime. Внутри so_define_agent() агент может выполнить специфические действия перед началом своей работы. В частности, подписаться на интересующие его сообщения.

У агента a_meeting_place_t метод so_define_agent() выглядит следующим образом:

      virtual void
      so_define_agent()
         {
            so_change_state( st_empty );

            so_subscribe( m_self_mbox ).in( st_empty )
               .event( &a_meeting_place_t::evt_first_creature );
            so_subscribe( m_self_mbox ).in( st_one_creature_inside )
               .event( &a_meeting_place_t::evt_second_creature );

            so_subscribe( m_self_mbox ).in( st_empty )
               .event( &a_meeting_place_t::evt_shutdown_ack );
         }

Сначала он указывает, что его начальным состоянием будет st_empty -- т.е. место встречи пусто и любой хамелеон, который первым пришлет msg_meeting_request, беспрепятственно попадет на место встречи.

Затем агент два раза подписывается на msg_meeting_request. Два раза потому, что в разных состояниях агент должен реагировать на это сообщение по-разному. Поэтому на это сообщение в состоянии st_empty цепляется обработчик evt_first_creature, а в состоянии st_one_creature_inside -- evt_second_creature. Эта специфическая для состояний привязка выполняется посредством парных вызовов in(состояние).event(событие).

Затем выполняется подписка на сообщение msg_shutdown_ack. По логике работы агента, эти сообщения он может получать только когда находится в состоянии st_empty.

Я говорю, что агент подписывается то на одно сообщение, то на другое, хотя в приведенном здесь фрагменте кода нигде нет названий msg_meeting_request и msg_shutdown_ack. Это не нужно, т.к. тип сообщения определяется во время компиляции из прототипа метода-обработчика. Так, у метода evt_first_creature вот такой прототип:

      void
      evt_first_creature(
         const so_5::rt::event_data_t< msg_meeting_request > & evt )

И именно из прототипа обработчика SObjectizer понимает сообщение какого типа будет приводить к возникновению данного события.

Обработчики событий a_meeting_place_t довольно простые. Обработчик события evt_first_creature проверяет, можно ли проводить встречи или лимит исчерпан. Если исчерпан, то хамелеону дается команда завершить свою работу. Если же встречи еще можно проводить, то a_meeting_place_t сохраняет у себя информацию об этом хамелеоне и его цвете. А так же переходит в состояние st_one_creature_inside, что указывает, что следующее сообщение msg_meeting_request будет обрабатываться уже по-другому.

Обработчик события evt_second_creature сообщает каждому из двух встретившихся хамелеонов цвет второго хамелеона. Уменьшает количество оставшихся встреч и возвращается в состояние st_empty.

Обработчик события evt_shutdown_ack подсчитывает общее количество встреч и следит за тем, есть ли еще живые хамелеоны. Если живых хамелеонов больше не осталось, то работа SObjectizer-а завершается.

Совершенно ничего сложного :) Единственная хитрость -- это то, что evt_shutdown_ack всегда отрабатывает только в состоянии st_empty. Обеспечивается это логикой работы агента. Агент отсылает хамелеонам команду на завершение работы только будучи в состоянии st_empty -- эти действия выполняются в evt_first_creature, это событие запускается только в st_empty и при таком сценарии перехода в какое-то другое состояния не производится.

Теперь нужно прояснить, как же именно агенты идентифицируют друг друга и отсылают друг другу сообщения.

В SObjectizer-5 есть специальное понятие -- mbox (почтовый ящик). Сообщения отсылаются не агентам напрямую, а в mbox-ы. А вот агент может быть подписан на содержимое mbox-а. И когда в mbox попадает сообщение, то оно отдается всем подписавшимся на него агентам.

В SObjectizer-5 поддерживаются два типа mbox-ов: простые анонимные mbox-ы, которые и были задействованы в данном примере, а так же именованные mbox-ы. Но именованные mbox-ы несколько похитрее устроены и их лучше обсудить как-нибудь в другой раз.

Итак, для отсылки и получения сообщений агентам нужны mbox-ы. Агенты могут самостоятельно их создавать, а могут получать откуда-то тем или иным способом. В данном примере с каждым агентом связан свой mbox. Про mbox агента a_meeting_place_t знают все. А вот агент a_meeting_place_t изначально не знает mbox-ов агентов-хамелеонов. Но a_meeting_place_t должен как-то посылать сообщения этим агентам... Как же он будет это делать?

Очень просто: агенты-хамелеоны сами будут сообщать свои mbox-ы агенту a_meeting_place_t в сообщении msg_meeting_request:

struct msg_meeting_request : public so_5::rt::message_t
   {
      so_5::rt::mbox_ref_t m_who;
      color_t m_color;

Фактически, работа с mbox-ами в SObjectizer ведется через умные указатели, что позволяет легко передавать mbox-ы в сообщениях или сохранять внутри агента. Так, агент a_meeting_place_t в evt_first_creature сохраняет у себя информацию о первом хамелеоне копируя к себе ссылку на mbox и цвет хамелеона:

            if( m_remaining_meetings )
            {
               so_change_state( st_one_creature_inside );

               m_first_creature_mbox = evt->m_who;
               m_first_creature_color = evt->m_color;

А затем использует сохраненный mbox первого хамелеона для отсылки ему результата встречи:

      void
      evt_second_creature(
         const so_5::rt::event_data_t< msg_meeting_request > & evt )
         {
            evt->m_who->deliver_message(
                  new msg_meeting_result( m_first_creature_color ) );
            m_first_creature_mbox->deliver_message(
                  new msg_meeting_result( evt->m_color ) )
;

Агент-хамелеон, представленный типом a_creature_t, еще проще. Он не имеет явно выделенных состояний. И обрабатывает всего два сообщения: результат встречи (сообщение msg_meeting_result) и команду на завершение работы (сообщение msg_shutdown_request). Поэтому метод so_define_agent() и события evt_meeting_result/evt_shutdown_request у него очень простые.

Самый важный момент с a_creature_t в том, что он должен начать работать сразу же после того, как окажется зарегистрированным в SObjectizer RunTime. Т.е. как только SObjectizer закончит все свои действия по регистрации нового агента, он должен как-то этого агента "толкнуть". В SObjectizer это происходит посредством специального внутреннего сообщения, которое отсылается агенту и которое приводит к вызову виртуального метода so_evt_start. Агент a_creature_t пользуется этим и в своем so_evt_start сразу же отсылает запрос на вход на место встречи:

      virtual void
      so_evt_start()
         {
            m_meeting_place_mbox->deliver_message(
                  new msg_meeting_request( m_self_mbox, m_color ) );
         }

А вот агенту a_meeting_place_t ничего не нужно делать сразу после регистрации. Поэтому у него метод so_evt_start не определен.

Общая же картина работы получается следующей. Одновременно стартуют пять агентов -- один a_meeting_place_t и четыре a_creature_t. Агенты хамелеоны сразу же отсылают по одному сообщению msg_meeting_request агенту a_meeting_place. Эти сообщения выстраиваются в очередь. Агент a_meeting_place берет первое из них и, поскольку он находится в состоянии st_empty, обрабатывает его событием evt_first_creature. После чего берет второе сообщение, но обрабатывает его событием evt_second_creature, т.к. находится в состоянии st_one_creature_inside. Двум агентам-хамелеонам отсылаются сообщения msg_meeting_result, a_meeting_place возвращается в состояние st_empty. Берется следующее сообщение из очереди... И т.д.

Параллельно этому агенты-хамелеоны обрабатывают msg_meeting_result и шлют новые запросы msg_meeting_request, скапливающиеся в очереди a_meeting_place.

Все это происходит до тех пор, пока не исчерпается лимит встреч. Как только лимит будет исчерпан, вместо msg_meeting_result хамелеоны получат msg_shutdown_request. Они ответят посредством msg_shutdown_ack. И, когда a_meeting_place соберет все msg_shutdown_ack, работа теста завершится.

Теперь следующий важный вопрос: а кто обеспечивает то, что каждый агент работает на собственной нити? Ведь в коде агентов об этом вообще ничего не говорит?

Для ответа на вопрос нужно посмотреть, как именно запускается SObjectizer RunTime и как регистрируются тестовые агенты.

Одно из главнейших отличий SObjectizer-5 от SObjectizer-4 состоит в том, что в SO5 внутри одного приложения можно запустить произвольное количество SObjectizer RunTime-ов. Каждый RunTime будет представлен объектом so_environment_t, а созданием RunTime занимается семейство функций run_so_environment*. В данном примере использована одна из них:

int
main( int argc, char ** argv )
{
   try
   {
      const int meetings = 2 == argc ? std::atoi( argv[1] ) : 10;

      so_5::api::run_so_environment_with_parameter(
            &init,
            meetings,
            std::move(
                  so_5::rt::so_environment_params_t()
                        .add_named_dispatcher(
                              "active_obj",
                              so_5::disp::active_obj::create_disp() ) ) );

Данная функция создает RunTime, вызывает пользовательскую инициализирующую фукнцию init и передает ей ссылку на созданный RunTime. В данном случае важно то, что при вызове run_so_environment_with_parameter() передается важный дополнительный параметр для нового RunTime -- создается специальный диспетчер с активными объектами с именем active_obj. Этот диспетчер для всех своих агентов будет создавать отдельную нить. Нужно только обеспечить, чтобы агент был привязан к этому диспетчеру.

А эта привязка декларируется вот в этом месте:

void
init(
   so_5::rt::so_environment_t & env,
   int meetings )
   {
      color_t creature_colors[ CREATURE_COUNT ] =
         { BLUE, RED, YELLOW, BLUE };

      auto meeting_place_mbox = env.create_local_mbox();

      auto coop = env.create_coop( "chameneos",
            so_5::disp::active_obj::create_disp_binder(
                  "active_obj" ) )
;

Все агенты SObjectizer должны принадлежать какой-нибудь кооперации. Могут быть кооперации из одного агента (вырожденный случай), а могут содержать сотни тысяч агентов. В данном тесте одна кооперация включает в себя всех создаваемых агентов.

Кооперация отвечат за то, чтобы в момент регистрации агенты были правильно обработаны SObjectizer-ом. В частности, каждому агенту должен быть назначен диспетчер -- это называется привязка к диспетчеру. В данном случае все агенты должны быть привязаны к одному и тому же диспетчеру с активными нитями. Поэтому при создании кооперации задается связующий объект (называемый disp_binder), который знает, как связать агента с нужным диспетчером. Благодаря этому объекту все агенты во время своей регистрации будут связаны с нужным диспетчером и получат в свое распоряжение собственную, отдельную рабочую нить.

Вот, собственно, и все. Какой-то сложности в прикладной логике в данном случае нет. Есть, наверное, некоторая многословность, которая обусловлена как особенностями C++, так и спецификой SObjectizer-а. Над этим мы тоже работаем. Получается не быстро, но в SObjectizer-4 было еще многословнее :)

Про производительность, сравнимую с производительностью специально заточенных под эту задачу примеров говорить особо не приходится. Т.к. в хардкорных решениях работа идет "по месту", никаких динамических объектов не создается. Здесь же очень высокие расходы на создание-удаление сообщений, их хранение в очередях и т.д.

Хотя как знать, как знать. Вот эта старая реализация у меня сейчас показывает такие результаты:
$ time _gcc_x_x_generic/release/sample.so_5.chameneos_simple_raw.exe 5000000
10000000

real    0m41.323s
user    0m0.000s
sys     0m0.000s


Тогда как новая на SO-5.3:
$ time _gcc_x_x_generic/release/sample.so_5.chameneos_simple.exe 5000000
Creatures met: Creatures met: 2500043Creatures met: 2500213

2499781
Creatures met: 2499963
Total: 10000000

real    0m17.481s
user    0m0.000s
sys     0m0.000s

Мусорная печать во втором случае -- это результат параллельного вывода на консоль из разных потоков.

Оба теста скомпилированы 64-bit GCC 4.8.3 (MinGW-w64) и ACE 6.2.6. Win 8.1 64-bit, Core i7 2.4GHz.

Update 2014-06-09. Ув.тов.Игорь Мирончик сделал реализацию этого теста на Qt, вот здесь можно увидеть полный исходный текст реализации. По его замерам на 500K встреч SObjectizer показывает результат ~11 секунд, Qt-шный вариант -- ~104 секунды.


Ну и вот полный исходный текст теста:

#include <iostream>
#include <iterator>
#include <numeric>

#include <ace/OS.h>

#include <so_5/rt/h/rt.hpp>
#include <so_5/api/h/api.hpp>

#include <so_5/disp/active_obj/h/pub.hpp>

enum color_t
   {
      BLUE = 0,
      RED = 1,
      YELLOW = 2,
      FADED = 3
   };

struct msg_meeting_request : public so_5::rt::message_t
   {
      so_5::rt::mbox_ref_t m_who;
      color_t m_color;

      msg_meeting_request(
         const so_5::rt::mbox_ref_t & who,
         color_t color )
         :  m_who( who )
         ,  m_color( color )
         {}
   };

struct msg_meeting_result : public so_5::rt::message_t
   {
      color_t m_color;

      msg_meeting_result( color_t color )
         :  m_color( color )
         {}
   };

struct msg_shutdown_request : public so_5::rt::signal_t {};

struct msg_shutdown_ack : public so_5::rt::message_t
   {
      int m_creatures_met;

      msg_shutdown_ack( int creatures_met )
         :  m_creatures_met( creatures_met )
         {}
   };

class a_meeting_place_t
   :  public so_5::rt::agent_t
   {
   public :
      a_meeting_place_t(
         so_5::rt::so_environment_t & env,
         const so_5::rt::mbox_ref_t & self_mbox,
         int creatures,
         int meetings )
         :  so_5::rt::agent_t( env )
         ,  st_empty( self_ptr(), "st_empty" )
         ,  st_one_creature_inside( self_ptr(), "st_one_creature_inside" )
         ,  m_self_mbox( self_mbox )
         ,  m_creatures_alive( creatures )   
         ,  m_remaining_meetings( meetings )
         ,  m_total_meetings( 0 )
         {}

      virtual void
      so_define_agent()
         {
            so_change_state( st_empty );

            so_subscribe( m_self_mbox ).in( st_empty )
               .event( &a_meeting_place_t::evt_first_creature );
            so_subscribe( m_self_mbox ).in( st_one_creature_inside )
               .event( &a_meeting_place_t::evt_second_creature );

            so_subscribe( m_self_mbox ).in( st_empty )
               .event( &a_meeting_place_t::evt_shutdown_ack );
         }

      void
      evt_first_creature(
         const so_5::rt::event_data_t< msg_meeting_request > & evt )
         {
            if( m_remaining_meetings )
            {
               so_change_state( st_one_creature_inside );

               m_first_creature_mbox = evt->m_who;
               m_first_creature_color = evt->m_color;
            }
            else
               evt->m_who->deliver_signal< msg_shutdown_request >();
         }

      void
      evt_second_creature(
         const so_5::rt::event_data_t< msg_meeting_request > & evt )
         {
            evt->m_who->deliver_message(
                  new msg_meeting_result( m_first_creature_color ) );
            m_first_creature_mbox->deliver_message(
                  new msg_meeting_result( evt->m_color ) );

            --m_remaining_meetings;

            so_change_state( st_empty );
         }

      void
      evt_shutdown_ack(
         const so_5::rt::event_data_t< msg_shutdown_ack > & evt )
         {
            m_total_meetings += evt->m_creatures_met;
            
            if0 >= --m_creatures_alive )
            {
               std::cout << "Total: " << m_total_meetings << std::endl;

               so_environment().stop();
            }
         }

   private :
      so_5::rt::state_t st_empty;
      so_5::rt::state_t st_one_creature_inside;

      const so_5::rt::mbox_ref_t m_self_mbox;

      int m_creatures_alive;
      int m_remaining_meetings;
      int m_total_meetings;

      so_5::rt::mbox_ref_t m_first_creature_mbox;
      color_t m_first_creature_color;
   };

class a_creature_t
   :  public so_5::rt::agent_t
   {
   public :
      a_creature_t(
         so_5::rt::so_environment_t & env,
         const so_5::rt::mbox_ref_t & meeting_place_mbox,
         color_t color )
         :  so_5::rt::agent_t( env )
         ,  m_meeting_place_mbox( meeting_place_mbox )
         ,  m_self_mbox( env.create_local_mbox() )
         ,  m_meeting_counter( 0 )
         ,  m_color( color )
         {}

      virtual void
      so_define_agent()
         {
            so_subscribe( m_self_mbox )
               .event( &a_creature_t::evt_meeting_result );

            so_subscribe( m_self_mbox )
               .event( &a_creature_t::evt_shutdown_request );
         }

      virtual void
      so_evt_start()
         {
            m_meeting_place_mbox->deliver_message(
                  new msg_meeting_request( m_self_mbox, m_color ) );
         }

      void
      evt_meeting_result(
         const so_5::rt::event_data_t< msg_meeting_result > & evt )
         {
            m_color = complement( evt->m_color );
            m_meeting_counter++;

            m_meeting_place_mbox->deliver_message(
                  new msg_meeting_request( m_self_mbox, m_color ) );
         }

      void
      evt_shutdown_request(
         const so_5::rt::event_data_t< msg_shutdown_request > & )
         {
            m_color = FADED;
            std::cout << "Creatures met: " << m_meeting_counter << std::endl;

            m_meeting_place_mbox->deliver_message(
                  new msg_shutdown_ack( m_meeting_counter ) );
         }

   private :
      const so_5::rt::mbox_ref_t m_meeting_place_mbox;
      const so_5::rt::mbox_ref_t m_self_mbox;

      int m_meeting_counter;

      color_t m_color;

      color_t
      complement( color_t other ) const
         {
            switch( m_color )
               {
               case BLUE:
                  return other == RED ? YELLOW : RED;
               case RED:
                  return other == BLUE ? YELLOW : BLUE;
               case YELLOW:
                  return other == BLUE ? RED : BLUE;
               default:
                  break;
               }
            return m_color;
         }
   };

const std::size_t CREATURE_COUNT = 4;

void
init(
   so_5::rt::so_environment_t & env,
   int meetings )
   {
      color_t creature_colors[ CREATURE_COUNT ] =
         { BLUE, RED, YELLOW, BLUE };

      auto meeting_place_mbox = env.create_local_mbox();

      auto coop = env.create_coop( "chameneos",
            so_5::disp::active_obj::create_disp_binder(
                  "active_obj" ) );

      coop->add_agent(
            new a_meeting_place_t(
                  env,
                  meeting_place_mbox,
                  CREATURE_COUNT,
                  meetings ) );
      
      for( std::size_t i = 0; i != CREATURE_COUNT; ++i )
         {
            coop->add_agent(
                  new a_creature_t(
                        env,
                        meeting_place_mbox,
                        creature_colors[ i ] ) );
         }

      env.register_coop( std::move( coop ) );
   }

int
main( int argc, char ** argv )
{
   try
   {
      const int meetings = 2 == argc ? std::atoi( argv[1] ) : 10;

      so_5::api::run_so_environment_with_parameter(
            &init,
            meetings,
            std::move(
                  so_5::rt::so_environment_params_t()
                        .add_named_dispatcher(
                              "active_obj",
                              so_5::disp::active_obj::create_disp() ) ) );
   }
   catchconst std::exception & ex )
   {
      std::cerr << "Error: " << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

И, небольшим довеском, в качестве рекламы нормальных build-инструментов (вроде Mxx_ru и SCons-а), проектный файлик для него:

require 'mxx_ru/cpp'

MxxRu::Cpp::exe_target {

   required_prj 'ace/dll.rb'
   required_prj 'so_5/prj.rb'
   target 'sample.so_5.chameneos_simple'

   cpp_source 'main.cpp'
}

Комментариев нет: