понедельник, 20 октября 2014 г.

[prog.c++] Пример решения проблемы Producer-Consumer в SO-5.5.1

В версии 5.5.1 состав штатных примеров SObjectizer-а был расширен несколькими новыми. Среди них пример под названием work_generation, который демонстрирует один из возможных вариантов решения проблемы Producer-Consumer средствами SObjectizer.

В примере решается следующая задача: есть несколько генераторов прикладных запросов. Эти генераторы производят нагрузку случайным образом -- когда-то больше, когда-то меньше. Есть несколько исполнителей запросов. Они тратят свое время на обработку сгенерированных запросов. Это время так же определяется случайным образом: иногда исполнители успевают разгрести все полученные ими запросы, иногда не успевают. Смысл задачи в том, чтобы в ситуации, когда исполнители не успевают разгребать запросы, нигде не росли очереди сообщений. Поскольку стоит такому росту начаться, с большой долей вероятности он уже не закончится, т.к. рост очередей будет способствовать расходу памяти, расход памяти будет приводить к замедлению обработки запросов, замедление обработки будет вести дальнейшему росту очередей и т.д. по замкнутому кругу.

Можно сказать, что этот пример показывает вполне работоспособную и опробованную в свое время на практике схему защиты агентов от перегрузки. Некоторое время назад эта тема в очередной раз обсуждалась на предмет внедрения в ядро SObjectizer каких-то средств обеспечения overload control (#1, #2, #3). Но, в очередной раз, до практических шагов не дошло, т.к. хорошая защита от перегрузки очень тесно завязывается на специфику конкретной прикладной задачи. И, поэтому, лучше всего может быть решена в прикладном коде. А данный пример показывает, как уже имеющиеся возможности SObjectizer позволяют строить подобные решения.

Пример work_generation работает следующим образом:

  • есть несколько агентов генераторов, они случайным образом распределяют прикладные запросы на потребителей;
  • каждый потребитель состоит из пары агентов: receiver и processor. Роли между ними распределены следующим образом:
    • агент receiver получает новые запросы от генераторов и сохранят их в своем буфере фиксированного размера. То, что не помещается в буфер, отбрасывается. Тем самым выполняется защита исполнителя от слишком большого потока запросов;
    • агент processor забирает у receiver-а буфер с накопленными запросами и имитирует их обработку. Когда обработка завершается, processor вновь обращается к receiver-у за следующим буфером, содержимое которого накопилось за время обработки предыдущей пачки заявок.

Такая имитация длится 20 секунд, после чего работа примера завершается.

Агенты из примера распределяются по диспетчерам следующим образом:

  • агенты-генераторы работают на своем thread_pool диспетчере. Не смотря на то, что генераторы входят в одну кооперацию, они используют индивидуальный fifo-режим, т.е. каждый работает независимо от других агентов;
  • агенты receiver-ы работают на своем thread_pool диспетчере. Эти агенты принадлежат разным кооперациям, но работают на общем для receiver-ов пуле потоков;
  • агенты producer-ы работают на active_obj диспетчере, т.е. у каждого такого агента своя собственная рабочая нить.

Такое разделение по диспетчерам позволяет каждой из сторон работать независимо друг от друга (особенно это важно для receiver-ов и processor-ов). Главным способом балансировки нагрузки на processor-ов являются размеры буферов в агентах receiver-ах. Как только буфер заполнен, новые запросы не принимаются, а отвергаются. В большинстве задач, которые мне приходилось решать ранее с помощью SObjectizer-а, это было приемлимо, т.к. отправитель запроса повторял запрос при отсутствии ответа на него. В случаях же, когда просто выбросить запрос нельзя, а нужно сгенерировать какой-то осмысленный ответ, можно заставить receiver-а возвращать какой-то специализированный результат обработки запроса, который бы говорил отправителю, что система временно перегружена и запрос не может быть полностью обработан.

Все агенты receiver-ы вынесены на общий пул потоков из-за того, что каждый из них выполняет очень короткие операции. Поэтому эти агенты работают очень быстро и не мешают друг другу. В то же время каждый processor работает на своей собственной нити, что позволяет processor-ам выполнять длительные синхронные операции (например, общаться с СУБД) не боясь заблокировать других агентов.

В работе примера два раза задействовано синхронное взаимодействие агентов. В первый раз при передаче нового запроса от агента-генератора агенту receiver-у. Здесь синхронность нужна для двух вещей: во-первых, это еще один способ сдержать слишком "шустрого" генератора. Т.е. если генератор пытается передать запрос receiver-у, который уже принимает заявки от других генераторов, то генератор будет приостановлен до тех пор, пока его заявка не будет принята или отклонена. Во-вторых, синхронное взаимодействие в этом месте упрощает передачу информации о том, что у receiver-а буфер переполнен и нет смысла давать этому receiver-у новые запросы. Без синхронного обращения к receiver-у пришлось бы придумывать какой-то другой способ обратной связи, либо же вообще тупо слать новые запросы наудачу, не интересуясь тем, приняты они или нет (такой подход вполне имеет право на жизнь, но он чреват переполнением очередей SObjectizer-а).

Второй раз синхронное взаимодействие используется агентом processor-ом для того, чтобы забрать у receiver-а буфер с принятыми запросами. Это взаимодействие упрощает передачу информации от receiver-а к processor-у: не нужно придумывать ответные сообщения, receiver просто возвращает буфер из своего обработчика сообщения processor-а. А за счет move semantic, которая есть в C++ных std::vector-ах, нет потери эффективности в передаче данных от одного агента другому (при этом информация, фактически передается из одной рабочей нити в другую под mutex-ом, о котором пользователь даже не догадывается и о котором пользователю не нужно думать).

Однако, синхронное взаимодействие -- вещь далеко не самая безопасная. Исключения могут возникнуть в любой момент и по любому поводу :) Например, проигнорировал агент-получатель сообщение-запрос (скажем, не обрабатывает его в своем текущем состоянии) -- исключение. Вообще оказался дерегистрированным на момент прихода сообщения-запроса -- исключение. Чуть-чуть опоздал с обработкой -- исключение :) Поэтому все места в примере, где задействуются синхронные запросы, оформлены в отдельные методы с try-catch обрамлением места синхронного обращения к другому агенту.

Думаю, на этом вводную часть можно завершить и показать несколько фрагментов кода из агентов с пояснениями. Весь же код примера приведен в конце поста.

Агент-генератор работает по тактовой схеме. На каждом такте генерирует случайное количество запросов и случайным образом распределяет из между получателями. Причем, если получатель оказывается переполненным и отказывается принимать запросы, то на этом такте нагрузка на него больше не отправляется. После раскидывания всех запросов (либо после того, как все получатели отказались принимать запросы), агент-генератор инициирует новый такт после паузы, чья длительность так же вычисляется случайным образом:

void
evt_next_turn()
{
   // How many requests will be sent on this turn.
   const int requests = random( 1100 );

   TRACE() << "GEN(" << m_name << ") turn started, requests="
         << requests << std::endl;

   // We need copy of workers list to modify it if
   // some worker rejects our requests.
   std::vector< so_5::rt::mbox_t > live_workers( m_workers_mboxes );
   int sent = 0;
   // If there is no active workers there is no need to continue.
   while( sent < requests && !live_workers.empty() )
   {
      if( generate_next_request( live_workers ) )
         ++sent;
   }

   // How much to sleep until next turn.
   const auto next_turn_pause = std::chrono::milliseconds( random(050) );

   TRACE() << "GEN(" << m_name << ") requests generated="
         << sent << ", will sleep for "
         << next_turn_pause.count() << "ms" << std::endl;

   so_5::send_delayed_to_agent< msg_next_turn >( *this, next_turn_pause );
}

Внутри generate_next_request нет ничего интересного. За исключением, пожалуй, момента передачи нового запроса агенту receiver-у. Этот момент можно показать полностью чтобы проиллюстрировать осторожное обращение с синхронными запросами:

bool
push_request_to_receiver(
   const so_5::rt::mbox_t & to,
   std::unique_ptr< application_request > req )
{
   // There is a plenty of room for any errors related to
   // synchronous invocation. Catch them all and treat them
   // as inabillity of worker to process request.
   try
   {
      return to->get_one< bool >()
            .wait_for( std::chrono::milliseconds( 10 ) )
            .sync_get( std::move( req ) );
   }
   catchconst std::exception & x )
   {
      TRACE()<< "GEN(" << m_name << ") failed to push request: "
            << x.what() << std::endl;
   }

   return false;
}

Тут видно, что генератор пытается "вызвать" метод receiver-а, который принимает application_request, а возвращает bool -- true, если потребителю можно продолжать слать запросы, или false в противном случае. Любая ошибка в процессе "вызова" трактуется как невозможность получателя принять запрос.

Такой подход к обработке ошибок синхронного обращения к другому агенту оправдывает себя, например, в процессе завершения работы примера. Вполне возможна ситуация, когда кооперации с потребителями уже дерегистрированы, а агент-генератор продолжает работу на своем такте. Попытка отослать синхронный запрос к дерегистрированному получателю приведет к ошибке "broken promise", генерируемой стандартной библиотекой C++. Что будет видно по следу работы приложения, например:


GEN(g2) failed to push request: broken promise
GEN(g2) failed to push request: broken promise
GEN(g2) failed to push request: broken promise
GEN(g2) failed to push request: broken promise
GEN(g2) failed to push request: broken promise
GEN(g2) requests generated=0, will sleep for 6ms
PRO(p1) processing took: 42.735ms

В агенте receiver самое интересное, пожалуй, это его метод so_define_agent() в котором, в общем-то, описывается вся логика работы этого агента:

virtual void
so_define_agent() override
{
   this >>= st_not_full;

   // When in the normal state...
   st_not_full
      // Store new request in ordinary way...
      .event( &a_receiver_t::evt_store_request )
      // Return request array to processor.
      .event< msg_take_requests >( &a_receiver_t::evt_take_requests );

   // When overload...
   st_overload
      // Reject any new request...
      .event( &a_receiver_t::evt_reject_request )
      // But return request array to processer as usual.
      .event< msg_take_requests >( &a_receiver_t::evt_take_requests );
}

Т.е. агент располагает двумя состояниями: st_not_full, в котором он принимает новые запросы, и st_overload, в котором новые запросы игнорируются. Переход из st_not_full в st_overload происходит, когда это необходимо, внутри обработчика нового запроса:

bool
evt_store_request( const application_request & what )
{
   // Just store new request.
   m_requests.push_back( what );

   if( m_requests.size() < max_capacity )
      // Not overloaded, new requests could be accepted.
      return true;
   else
   {
      // Overloaded. New requests will be rejected.
      this >>= st_overload;
      return false;
   }
}

Возврат в st_not_full осуществляется когда агент receiver отдает свой буфер processor-у для обработки:

std::vector< application_request >
evt_take_requests()
{
   // Value to return.
   std::vector< application_request > result;
   result.swap( m_requests );

   TRACE() << "REC(" << m_name << ") takes requests off, count: "
         << result.size() << std::endl;

   // Preparation to the next turn.
   m_requests.reserve( max_capacity );
   this >>= st_not_full;

   // Return request array to producer.
   return result;
}

Агент processor тривиальнее, чем все остальные агенты этого примера. Он так же работает по тактам. В начале каждого такта он пытается забрать буфер у receiver-а. Если этот буфер оказывается пустым, агент засыпает на некоторое время, после чего пытается начать новый такт, т.е. опять забирает буфер у receiver-а. Если же буфер не пуст, то агент processor имитирует обработку запросов, после чего пытается начать новый такт:

void
evt_next_turn()
{
   // Take requests from receiver.
   auto requests = take_requests();

   if( requests.empty() )
   {
      // No requests. There is no sense to make next request
      // immediately. It is better to take some time to generators
      // and receiver.
      TRACE() << "PRO(" << m_name << ") no request received, sleeping"
            << std::endl;
      so_5::send_delayed_to_agent< msg_next_turn >(
            *this, std::chrono::milliseconds( 25 ) );
   }
   else
   {
      // There are some requests. They must be processed.
      process_requests( requests );
      // Start next turn immediately.
      so_5::send_to_agent< msg_next_turn >( *this );
   }
}

Еще один момент в примере, на который хотелось бы обратить внимание читателей, т.к. раньше на этом особого акцента при обсуждении работы с SObjectizer-ом не делалось. Этот момент -- отсутствие в коде агентов какой-либо обработки завершения работы примера. Т.е. нигде нет никаких флагов необходимости завершения работы, нигде нет никаких проверок того, что работу нужно завершить. Тем не менее, все корректно завершается и нигде ничего не зависает, не зацикливается. Происходит это благодаря тому, что циклами обработки сообщений управляет SObjectizer. При завершении работы приложения эти циклы прерывается, агенты перестают получать новые сообщения и их работа завершается.

Вот, пожалуй, и все основные моменты этого примера. Если у читателей останутся какие-то вопросы или недопонимания, то с удовольствием отвечу/поясню/проясню в комментариях.


Ну а это полный исходный текст примера:

/*
 * A simple example of work load generation and simple form
 * of overload control based on SObjectizer features.
 */

#include <iostream>
#include <random>

#include <so_5/all.hpp>

// A helpers for trace messages.
std::mutex g_trace_mutex;
class locker_t
{
public :
   locker_t( std::mutex & m ) : m_lock( m ) {}
   locker_t( locker_t && o ) : m_lock( std::move( o.m_lock ) ) {}
   operator bool() const { return true; }
private :
   std::unique_lock< std::mutex > m_lock;
};

#define TRACE() \
ifauto l = locker_t{ g_trace_mutex } ) std::cout  

// Helper class with fatilities to random numbers generation.
class random_generator_mixin_t
{
public :
   random_generator_mixin_t()
   {
      m_random_engine.seed( std::hash<decltype(this)>()(this) );
   }

   int
   random( int l, int h )
   {
      return std::uniform_int_distribution<>( l, h )( m_random_engine );
   }

private :
   // Engine for random values generation.
   std::default_random_engine m_random_engine;
};

// Message to be processed by worker agent.
struct application_request : public so_5::rt::message_t
{
   std::string m_to;
   std::string m_from;
   std::string m_payload;
   std::string m_attributes;
   std::string m_generator;

   application_request(
      std::string to,
      std::string from,
      std::string payload,
      std::string attributes,
      std::string generator )
      :  m_to( std::move( to ) )
      ,  m_from( std::move( from ) )
      ,  m_payload( std::move( payload ) )
      ,  m_attributes( std::move( attributes ) )
      ,  m_generator( std::move( generator ) )
   {}
};

// Load generation agent.
class a_generator_t : public so_5::rt::agent_t,
   private random_generator_mixin_t
{
public :
   a_generator_t(
      // Environment to work in.
      so_5::rt::environment_t & env,
      // Name of generator.
      std::string name,
      // Workers.
      const std::vector< so_5::rt::mbox_t > & workers_mboxes )
      :  so_5::rt::agent_t( env )
      ,  m_name( std::move( name ) )
      ,  m_workers_mboxes( workers_mboxes )
   {}

   virtual void
   so_define_agent() override
   {
      // Just one handler in one state.
      so_default_state().event< msg_next_turn >(
            &a_generator_t::evt_next_turn );
   }

   virtual void
   so_evt_start() override
   {
      // Start work cycle.
      so_5::send_to_agent< msg_next_turn >( *this );
   }

private :
   // Signal about next turn start.
   struct msg_next_turn : public so_5::rt::signal_t {};

   // Generator name.
   const std::string m_name;
   // Workers.
   const std::vector< so_5::rt::mbox_t > m_workers_mboxes;

   void
   evt_next_turn()
   {
      // How many requests will be sent on this turn.
      const int requests = random( 1100 );

      TRACE() << "GEN(" << m_name << ") turn started, requests="
            << requests << std::endl;

      // We need copy of workers list to modify it if
      // some worker rejects our requests.
      std::vector< so_5::rt::mbox_t > live_workers( m_workers_mboxes );
      int sent = 0;
      // If there is no active workers there is no need to continue.
      while( sent < requests && !live_workers.empty() )
      {
         if( generate_next_request( live_workers ) )
            ++sent;
      }

      // How much to sleep until next turn.
      const auto next_turn_pause = std::chrono::milliseconds( random(050) );

      TRACE() << "GEN(" << m_name << ") requests generated="
            << sent << ", will sleep for "
            << next_turn_pause.count() << "ms" << std::endl;

      so_5::send_delayed_to_agent< msg_next_turn >( *this, next_turn_pause );
   }

   bool
   generate_next_request( std::vector< so_5::rt::mbox_t > & workers )
   {
      auto it = workers.begin();
      if( workers.size() > 1 )
         // There are more then one live workers. Select one randomly.
         std::advance( it, random( 0, workers.size() - 1 ) );

      // Prepare request.
      auto request = std::unique_ptr< application_request >(
            new application_request(
                  "Mr.Alexander Graham Bell",
                  "Mr.Thomas A. Watson",
                  "Mr. Watson - Come here - I want to see you",
                  "BestEffort,InMemory,NormalPriority",
                  m_name ) );

      // Send it to worker.
      auto result = push_request_to_receiver( *it, std::move( request ) );
      if( !result )
         workers.erase( it );

      return result;
   }

   bool
   push_request_to_receiver(
      const so_5::rt::mbox_t & to,
      std::unique_ptr< application_request > req )
   {
      // There is a plenty of room for any errors related to
      // synchronous invocation. Catch them all and treat them
      // as inabillity of worker to process request.
      try
      {
         return to->get_one< bool >()
               .wait_for( std::chrono::milliseconds( 10 ) )
               .sync_get( std::move( req ) );
      }
      catchconst std::exception & x )
      {
         TRACE()<< "GEN(" << m_name << ") failed to push request: "
               << x.what() << std::endl;
      }

      return false;
   }
};

// Load receiver agent.
class a_receiver_t : public so_5::rt::agent_t
{
public :
   // A signal to take the collected requests to processor.
   struct msg_take_requests : public so_5::rt::signal_t {};

   a_receiver_t(
      // Environment to work in.
      so_5::rt::environment_t & env,
      // Receiver's name.
      std::string name,
      // Max capacity of receiver
      std::size_t max_receiver_capacity )
      :  so_5::rt::agent_t( env )
      ,  m_name( std::move( name ) )
      ,  max_capacity( max_receiver_capacity )
   {
      m_requests.reserve( max_capacity );
   }

   virtual void
   so_define_agent() override
   {
      this >>= st_not_full;

      // When in the normal state...
      st_not_full
         // Store new request in ordinary way...
         .event( &a_receiver_t::evt_store_request )
         // Return request array to processor.
         .event< msg_take_requests >( &a_receiver_t::evt_take_requests );

      // When overload...
      st_overload
         // Reject any new request...
         .event( &a_receiver_t::evt_reject_request )
         // But return request array to processer as usual.
         .event< msg_take_requests >( &a_receiver_t::evt_take_requests );
   }

private :
   const so_5::rt::state_t st_not_full = so_make_state();
   const so_5::rt::state_t st_overload = so_make_state();

   // Receiver's name.
   const std::string m_name;

   // Max count of items to store between processing turns.
   const std::size_t max_capacity;

   // Storage for requests between turns.
   std::vector< application_request > m_requests;

   bool
   evt_store_request( const application_request & what )
   {
      // Just store new request.
      m_requests.push_back( what );

      if( m_requests.size() < max_capacity )
         // Not overloaded, new requests could be accepted.
         return true;
      else
      {
         // Overloaded. New requests will be rejected.
         this >>= st_overload;
         return false;
      }
   }

   bool
   evt_reject_request( const application_request & what )
   {
      TRACE() << "REC(" << m_name << ") reject request from "
            << what.m_generator << std::endl;
      return false;
   }

   std::vector< application_request >
   evt_take_requests()
   {
      // Value to return.
      std::vector< application_request > result;
      result.swap( m_requests );

      TRACE() << "REC(" << m_name << ") takes requests off, count: "
            << result.size() << std::endl;

      // Preparation to the next turn.
      m_requests.reserve( max_capacity );
      this >>= st_not_full;

      // Return request array to producer.
      return result;
   }
};

// Load processor agent.
class a_processor_t : public so_5::rt::agent_t,
   private random_generator_mixin_t
{
public :
   a_processor_t(
      // Environment to work in.
      so_5::rt::environment_t & env,
      // Processor's name.
      std::string name,
      // Receiver mbox.
      const so_5::rt::mbox_t & receiver )
      :  so_5::rt::agent_t( env )
      ,  m_name( std::move( name ) )
      ,  m_receiver( receiver )
   {}

   virtual void
   so_define_agent() override
   {
      // Just one handler in the default state.
      so_default_state().event< msg_next_turn >(
            &a_processor_t::evt_next_turn );
   }

   virtual void
   so_evt_start() override
   {
      // Start working cycle.
      so_5::send_to_agent< msg_next_turn >( *this );
   }

private :
   // Start of next processing turn signal.
   struct msg_next_turn : public so_5::rt::signal_t {};

   // Processor name.
   const std::string m_name;

   // Receiver.
   const so_5::rt::mbox_t m_receiver;

   void
   evt_next_turn()
   {
      // Take requests from receiver.
      auto requests = take_requests();

      if( requests.empty() )
      {
         // No requests. There is no sense to make next request
         // immediately. It is better to take some time to generators
         // and receiver.
         TRACE() << "PRO(" << m_name << ") no request received, sleeping"
               << std::endl;
         so_5::send_delayed_to_agent< msg_next_turn >(
               *this, std::chrono::milliseconds( 25 ) );
      }
      else
      {
         // There are some requests. They must be processed.
         process_requests( requests );
         // Start next turn immediately.
         so_5::send_to_agent< msg_next_turn >( *this );
      }
   }

   std::vector< application_request >
   take_requests()
   {
      // There is a plenty of room for any errors related to
      // synchronous invocation. Catch them all and treat them
      // as inabillity of receiver to provide request array.
      try
      {
         return m_receiver->
               get_one< std::vector< application_request > >()
               .wait_for( std::chrono::milliseconds( 20 ) )
               .sync_get< a_receiver_t::msg_take_requests >();
      }
      catchconst std::exception & x )
      {
         TRACE() << "PRO(" << m_name << ") failed to take requests: "
               << x.what() << std::endl;
      }

      return std::vector< application_request >();
   }

   void
   process_requests( const std::vector< application_request > & requests )
   {
      TRACE() << "PRO(" << m_name << ") start processing, requests="
            << requests.size() << std::endl;

      // Just imitation of requests processing.
      // Processing time is proportional to count of requests.
      const auto processing_time = std::chrono::microseconds(
            requests.size() * random( 1501500 ) );
      std::this_thread::sleep_for( processing_time );

      TRACE() << "PRO(" << m_name << ") processing took: "
            << processing_time.count() / 1000.0 << "ms" << std::endl;
   }
};

std::vector< so_5::rt::mbox_t >
create_processing_coops( so_5::rt::environment_t & env )
{
   std::vector< so_5::rt::mbox_t > result;

   std::size_t capacities[] = { 2535401520 };

   int i = 0;
   forauto c : capacities )
   {
      auto coop = env.create_coop( so_5::autoname );

      auto receiver = std::unique_ptr< a_receiver_t >(
            new a_receiver_t( env, "r" + std::to_string(i), c ) );

      auto receiver_mbox = receiver->so_direct_mbox();
      result.push_back( receiver_mbox );

      auto processor = std::unique_ptr< a_processor_t >(
            new a_processor_t( env, "p" + std::to_string(i), receiver_mbox ) );

      coop->add_agent( std::move( receiver ),
            so_5::disp::thread_pool::create_disp_binder(
                  "receivers",
                  so_5::disp::thread_pool::params_t() ) );

      coop->add_agent( std::move( processor ),
            so_5::disp::active_obj::create_disp_binder( "processors" ) );

      env.register_coop( std::move( coop ) );

      ++i;
   }

   return result;
}

void
init( so_5::rt::environment_t & env )
{
   auto receivers = create_processing_coops( env );

   auto coop = env.create_coop( so_5::autoname,
         so_5::disp::thread_pool::create_disp_binder(
               "generators",
               []( so_5::disp::thread_pool::params_t & p ) {
                  p.fifo( so_5::disp::thread_pool::fifo_t::individual );
               } ) );

   forint i = 0; i != 3; ++i )
   {
      auto agent = std::unique_ptr< a_generator_t >( 
            new a_generator_t( env, "g" + std::to_string(i), receivers ) );
      coop->add_agent( std::move( agent ) );
   }

   env.register_coop( std::move( coop ) );

   std::this_thread::sleep_for( std::chrono::seconds( 10 ) );
   env.stop();
}

int main()
{
   try
   {
      so_5::launch( init,
            []( so_5::rt::environment_params_t & params ) {
               params.add_named_dispatcher( "generators",
                     so_5::disp::thread_pool::create_disp( 3 ) );
               params.add_named_dispatcher( "receivers",
                     so_5::disp::thread_pool::create_disp( 2 ) );
               params.add_named_dispatcher( "processors",
                     so_5::disp::active_obj::create_disp() );
            } );
   }
   catchconst std::exception & ex )
   {
      std::cerr << "Error: " << ex.what() << std::endl;
      return 1;
   }

   return 0;
}

Комментариев нет: