пятница, 25 июля 2014 г.

[prog.c++] Небольшая демонстрация поведения разных диспетчеров в SObjectizer

В качестве демонстрации диспетчеров thread_pool и adv_thread_pool сделал сегодня небольшой пример, который имитирует некую длительную, блокирующую рабочую нить работу. В примере создается три агента:

Агент-менеджер, который отсылает N сообщений do_hardwork агенту-исполнителю, получает от исполнителя отчеты о проделанной работе (hardwork_done). Получая каждый экзепляр hardwork_done агент-менеджер отсылает сообщение check_hardwork третьему агенту -- агенту-контроллеру. В ответ контроллер должен прислать hardwork_checked. Когда на все N сообщений do_hardwork будут получены N сообщений hardwork_done и hardwork_checked, агент-менеджер отображает отчет о затраченном времени и завершает работу.

Агенты исполнитель и контроллер в своих обработчиках сообщений do_hardwork и check_hardwork для имитации занятости вычислительного ресурса просто засыпают на указанное количество миллисекунд. А после пробуждения отсылают менеджеру hardwork_done и hardwork_checked соответственно. Поскольку агенты засыпают на рабочих нитях диспетчера, то диспетчер не может их переиспользовать, пока агенты не проснуться. Т.е. выполняется такая дешевая имитация какой-то длительной работы (например, выполнение какого-то синхронного вызова, вроде обращения к БД или записи данных в сокет).

Пример позволяет выбрать, на каком диспетчере будут работать агенты. Можно указать либо one_thread, либо active_obj (каждый агент будет работать на своей собственной нити), либо thread_pool/adv_thread_pool. В случае thread_pool и adv_thread_pool агенты работают независимо друг от друга, в режиме individual FIFO. В случае active_obj диспетчера будет создано три рабочих нити (по количеству агентов). В случае thread_pool/adv_thread_pool будет создано столько рабочих нитей, сколько ядер определят C++ный рантайм (посредством std::thread::hardware_concurrency).

Соответственно, можно увидеть, как на суммарном времени работы теста сказывается возможность задействовать для обработки разное количество потоков. При использовании one_thread-диспетчера время получается строго линейным (т.е. количество запросов умноженное на величину тайм-аута и еще умноженное на два, т.к. тайм-ауты отсчитывают и агент-исполнитель, и агент-контроллер). У меня при параметрах 200 запросов и 20-мс тайм-ауте время составляет около 8 секунд.

Диспетчеры active_obj и thread_pool на 4-х ядерной машине дают время порядка 4 секунд. Т.е. за счет параллельной обработки запросов do_hardwork и check_hardwork получается просто результат умножения количества запросов на время выполнения одного запроса.

А вот adv_thread_pool на 4-х ядерной машине позволяет запустить 4-ре рабочих потока и распределить обработку всех do_hardwork и check_hardwork запросов между всеми рабочими потоками. Что сокращает время обработки теста до 2-х секунд.

Что хотелось бы отметить: сами агенты в тесте вообще не знают, на каких именно диспетчерах они будут работать. Ничего специализированного и заточенного под конкретный диспетчер в их коде нет. Разве что агенты исполнитель и контроллер указывают, что свои сообщения они обрабатывают в thread-safe методах. Что и позволяет получить эффект от распараллеливания на adv_thread_pool диспетчере.

Исходный код примера под катом. Он без комментариев в коде, но там настолько все тривиально (для меня, по крайней мере), что я не нашел, что и где прокомментировать. Но, если у кого-то будут вопросы или непонятки по коду, не стесняйтесь спросить, с удовольствием отвечу и поясню что к чему.

/*
 * Sample for demonstrating results of some hard work on different
 * dispatchers.
 */

#include <iostream>
#include <chrono>
#include <cstdlib>

#include <ace/OS.h>

#include <so_5/rt/h/rt.hpp>
#include <so_5/api/h/api.hpp>

#include <so_5/disp/one_thread/h/pub.hpp>
#include <so_5/disp/active_obj/h/pub.hpp>
#include <so_5/disp/thread_pool/h/pub.hpp>
#include <so_5/disp/adv_thread_pool/h/pub.hpp>

struct msg_do_hardwork : public so_5::rt::message_t
{
   unsigned int m_index;
   unsigned int m_milliseconds;

   msg_do_hardwork(
      unsigned int index,
      unsigned int milliseconds )
      :  m_index( index )
      ,  m_milliseconds( milliseconds )
   {}
};

struct msg_hardwork_done : public so_5::rt::message_t
{
   unsigned int m_index;

   msg_hardwork_done( unsigned int index )
      :  m_index( index )
   {}
};

struct msg_check_hardwork : public so_5::rt::message_t
{
   unsigned int m_index;
   unsigned int m_milliseconds;

   msg_check_hardwork(
      unsigned int index,
      unsigned int milliseconds )
      :  m_index( index )
      ,  m_milliseconds( milliseconds )
   {}
};

struct msg_hardwork_checked : public so_5::rt::message_t
{
   unsigned int m_index;

   msg_hardwork_checked(
      unsigned int index )
      :  m_index( index )
   {}
};

class a_manager_t : public so_5::rt::agent_t
{
   public :
      a_manager_t(
         so_5::rt::so_environment_t & env,
         const so_5::rt::mbox_ref_t & worker_mbox,
         const so_5::rt::mbox_ref_t & checker_mbox,
         unsigned int requests,
         unsigned int milliseconds )
         :  so_5::rt::agent_t( env )
         ,  m_worker_mbox( worker_mbox )
         ,  m_checker_mbox( checker_mbox )
         ,  m_requests( requests )
         ,  m_milliseconds( milliseconds )
      {}

      virtual void
      so_define_agent() override
      {
         so_subscribe( so_direct_mbox() )
            .event( &a_manager_t::evt_hardwork_done )
            .event( &a_manager_t::evt_hardwork_checked );
      }

      virtual void
      so_evt_start() override
      {
         m_start_time = std::chrono::steady_clock::now();

         forunsigned int i = 0; i != m_requests; ++i )
         {
            m_worker_mbox->deliver_message(
                  new msg_do_hardwork { i, m_milliseconds } );
         }
      }

      void
      evt_hardwork_done( const msg_hardwork_done & evt )
      {
         m_checker_mbox->deliver_message(
               new msg_check_hardwork { evt.m_index, m_milliseconds } );
      }

      void
      evt_hardwork_checked( const msg_hardwork_checked & evt )
      {
         ++m_processed;

         if( m_processed == m_requests )
         {
            auto finish_time = std::chrono::steady_clock::now();

            auto duration =
                  std::chrono::duration_cast< std::chrono::milliseconds >(
                        finish_time - m_start_time ).count() / 1000.0;

            std::cout << "Working time: " << duration << "s" << std::endl;

            so_environment().stop();
         }
      }

   private :
      const so_5::rt::mbox_ref_t m_worker_mbox;
      const so_5::rt::mbox_ref_t m_checker_mbox;

      const unsigned int m_requests;
      unsigned int m_processed = 0;

      const unsigned int m_milliseconds;

      std::chrono::steady_clock::time_point m_start_time;
};

so_5::rt::agent_coop_unique_ptr_t
create_test_coop(
   so_5::rt::so_environment_t & env,
   so_5::rt::disp_binder_unique_ptr_t disp_binder,
   unsigned int requests,
   unsigned int milliseconds )
{
   auto c = env.create_coop( "test", std::move( disp_binder ) );

   auto worker_mbox = env.create_local_mbox();
   auto checker_mbox = env.create_local_mbox();

   auto a_manager = c->add_agent(
         new a_manager_t(
               env,
               worker_mbox,
               checker_mbox,
               requests,
               milliseconds ) );

   c->define_agent()
      .event( worker_mbox,
            [a_manager]( const msg_do_hardwork & evt )
            {
               std::this_thread::sleep_for(
                     std::chrono::milliseconds( evt.m_milliseconds ) );

               a_manager->so_direct_mbox()->deliver_message(
                     new msg_hardwork_done { evt.m_index } );
            },
            so_5::thread_safe );

   c->define_agent()
      .event( checker_mbox,
            [a_manager]( const msg_check_hardwork & evt )
            {
               std::this_thread::sleep_for(
                     std::chrono::milliseconds( evt.m_milliseconds ) );

               a_manager->so_direct_mbox()->deliver_message(
                     new msg_hardwork_checked { evt.m_index } );
            },
            so_5::thread_safe );

   return c;
}

struct dispatcher_factories_t
{
   std::function< so_5::rt::dispatcher_unique_ptr_t() > m_disp_factory;
   std::function< so_5::rt::disp_binder_unique_ptr_t() > m_binder_factory;
};

dispatcher_factories_t
make_dispatcher_factories(
   const std::string & type,
   const std::string & name )
{
   dispatcher_factories_t res;

   if"active_obj" == type )
   {
      using namespace so_5::disp::active_obj;
      res.m_disp_factory = []() { return create_disp(); };
      res.m_binder_factory = [name]() { return create_disp_binder( name ); };
   }
   else if"thread_pool" == type )
   {
      using namespace so_5::disp::thread_pool;
      res.m_disp_factory = []() { return create_disp(); };
      res.m_binder_factory = 
         [name]() {
            return create_disp_binder(
                  name,
                  []( params_t & p ) { p.fifo( fifo_t::individual ); } );
         };
   }
   else if"adv_thread_pool" == type )
   {
      using namespace so_5::disp::adv_thread_pool;
      res.m_disp_factory = []() { return create_disp(); };
      res.m_binder_factory = 
         [name]() {
            return create_disp_binder(
                  name,
                  []( params_t & p ) { p.fifo( fifo_t::individual ); } );
         };
   }
   else if"one_thread" == type )
   {
      using namespace so_5::disp::one_thread;

      res.m_disp_factory = []() { return create_disp(); };
      res.m_binder_factory = [name]() { return create_disp_binder( name ); };
   }
   else
      throw std::runtime_error( "unknown type of dispatcher: " + type );


   return res;
}

struct config_t
{
   dispatcher_factories_t m_factories;
   unsigned int m_requests;
   unsigned int m_milliseconds;

   static const std::string dispatcher_name;
};

const std::string config_t::dispatcher_name = "dispatcher";

config_t
parse_params( int argc, char ** argv )
{
   if1 == argc )
      throw std::runtime_error( "no arguments given!\n\n"
            "usage:\n\n"
            "sample.so_5.hardwork_imit <disp_type> [requests] [worktime_ms]" );

   config_t r {
         make_dispatcher_factories( argv[ 1 ], config_t::dispatcher_name ),
         200,
         15
      };

   if2 < argc )
      r.m_requests = std::atoi( argv[ 2 ] );
   if3 < argc )
      r.m_milliseconds = std::atoi( argv[ 3 ] );

   std::cout << "Config:\n"
      "\t" "dispatcher: " << argv[ 1 ] << "\n"
      "\t" "requests: " << r.m_requests << "\n"
      "\t" "worktime (ms): " << r.m_milliseconds << std::endl;

   return r;
}

int main( int argc, char ** argv )
{
   try
   {
      const config_t config = parse_params( argc, argv );

      so_5::api::run_so_environment(
         [config]( so_5::rt::so_environment_t & env )
         {
            env.register_coop(
                  create_test_coop(
                        env,
                        config.m_factories.m_binder_factory(),
                        config.m_requests,
                        config.m_milliseconds ) );
         },
         [config]( so_5::rt::so_environment_params_t & params )
         {
            params.add_named_dispatcher(
                  config_t::dispatcher_name,
                  config.m_factories.m_disp_factory() );
         } );

      return 0;
   }
   catchconst std::exception & x )
   {
      std::cerr << "Exception: " << x.what() << std::endl;
   }

   return 2;
}

Комментариев нет: