пятница, 17 октября 2014 г.

[prog.flame] Асинхронность без порталов и телепортации :)

Вчера вечером в G+ увидел ссылку на статью с Хабра "Асинхронность 2: телепортация сквозь порталы." Просмотрел мельком, понял только то, что на этой полянке можно попробовать потоптаться и с SObjectizer-ом. Сегодня с утра перечитал повнимательнее. Решил посмотреть, как эта же самая задача может быть решена средствами SO-5.5.1. После 2.5 часов работы получилось вот так. Кода много. Но, думается, он не сложнее представленного на Хабре.

Под катом находится более подробное описание того, что и как работает в SObjectizer-овском варианте. Прежде чем заглядывать под кат очень рекомендую посмотреть на Хабровскую статью. Хотя бы на вводную ее часть, дабы условия задачи были понятны, т.к. для экономии места я их не повторяю у себя. Но вообще мне бы хотелось, чтобы читатели проштудировали хабровскую статью более внимательно, чтобы разобраться как работает тамошнее решение. Мне интересно, насколько будут различаться усилия для понимания этих двух вариантов. Лично у меня остались некоторые белые пятна после знакомства с кодом Григория Демченко (в частности, я не понял, кто и что получит в результате, если операция будет отменена из-за истечения тайм-аута). Но, возможно, это из-за того, что очень глубоко в код автора статьи я не погружался.

Итак, поехали смотреть, как же это будет выглядеть на SObjectizer-е.

Прежде всего отмечу, что в SO-варианте нет работы с сетью. С Asio я никогда прежде не работал, в ядре SO-5 поддержки работы с сетью нет, поэтому сетевая активность имитируется отдельным агентом. Имхо, для написанной на коленке демки этого вполне достаточно.

Запросы на поиск строки по ключу и обработку результатов у меня выполняет агент a_requests_producer_t. В Хабровской статье было условие о том, чтобы запросы можно было инициировать с UI-нити. Полагаю, моя реализация агента a_requests_producer_t это условие не нарушает, т.к. сделать диспетчера, который запускает события агента на UI-нити можно, в SO-4 такие диспетчеры были (в частности для Qt). В SO-5.5.1 таких нет, но это лишь потому, что надобности не было.

Агент a_requests_producer_t при старта отсылает себе несколько отложенных сообщений, которые должны имитировать возникновение необходимости в выполнении и отмене запросов:

virtual void
so_evt_start() override
{
   using millisec = std::chrono::milliseconds;

   so_5::send_delayed_to_agent< msg_initiate_request >(
         *this, millisec( 100 ), 1"Hello" );
   so_5::send_delayed_to_agent< msg_initiate_request >(
         *this, millisec( 200 ), 2"Bye" );
   so_5::send_delayed_to_agent< msg_initiate_request >(
         *this, millisec( 300 ), 3"Hello" );
   so_5::send_delayed_to_agent< msg_cancel_request >(
         *this, millisec( 305 ), 3 );
   so_5::send_delayed_to_agent< msg_initiate_request >(
         *this, millisec( 400 ), 4"Hello" );

   so_5::send_delayed_to_agent< msg_finish >(
         *this, std::chrono::seconds( 3 ) );
}

Т.е. сначала будет сделан запрос на поиск строки по ключу "Hello", затем "Bye", затем еще два раза для строки "Hello". У каждого запроса должен быть уникальный целочисленный идентификатор, посредством которого запросы будут отменяться. Ну и вообще такой идентификатор, как показывает практика, очень удобен в системах, где работа производится посредством массового обмена сообщениями между сущностями. В данном же примере этот идентификатор еще и показывает на какой именно запрос был получен ответ.

Для демонстрации отмены операции инициируется прерывание запроса с идентификатором 3 -- это один из запросов для строки "Hello".

Так же агент отсылает самому себе отложенный сигнал msg_finish для того, чтобы завершить работу примера через три секунды. За это время все запросы должны быть либо обработаны, либо отменены из-за тайм-аутов.

Отсылаемые агентом a_requests_producer_t сообщения обрабатываются им так, как показано ниже (все обработчики оформлены в виде lambda-функций, т.к. они очень маленькие и нет смысла выносить их в отдельные методы класса). Небольшое пояснение. Виртуальный метод so_define_agent специально предназначен для того, чтобы дать агенту возможность настроить свои подписки перед началом работы в SObjectizer RunTime. SObjectizer вызывает этот метод в процессе регистрации агента.

virtual void
so_define_agent() override
{
   so_default_state()
      .event( [=]( const msg_initiate_request & evt ) {
            TRACE() << "initiate request for: (" << evt.m_id << ":"
                  << evt.m_key << ")" << std::endl;

            m_request_conductor->initiate_request(
                  evt.m_id, evt.m_key, so_direct_mbox() );
         } )
      .event( [=]( const msg_cancel_request & evt ) {
            TRACE() << "cancel request for id: " << evt.m_id << std::endl;

            m_request_conductor->cancel_request( evt.m_id );
         } )
      .event( [=]( const msg_value_response & evt ) {
            TRACE() << "response for: (" << evt.m_id << ":"
                  << evt.m_key << ") = " << evt.m_value << std::endl;
         } )
      .event< msg_finish >( [=] { so_environment().stop(); } );
}

Агент a_requests_producer_t использует свое дефолтное состояние, в котором и обрабатывает все свои сообщения.

Для того, чтобы выполнить и/или отменить запрос предназначен вспомогательный класс request_conductor_t. Этот класс хранит контекст, необходимый для выполнения запроса (к этому контексту я еще вернусь) и предоставляет два метода: initiate_request для запуска асинхронного выполнения нового запроса и cancel_request для отмены ранее начатого запроса.

Код класса request_conductor_t небольшой и понятный, нужно лишь пояснить одну простую штуку. Для выполнения запроса создается агент a_request_performer_t. Этот новый агент регистрируется в SObjectizer как новая кооперация, имя которой строится на основе идентификатора запроса. Это нужно для того, чтобы иметь возможность отменять операции. Отмена заключается в простой дерегистрации (т.е. изъятии агента из SObjectizer-а). Т.е. когда инициатор запроса решает отменить запрос и дергает cancel_request, то request_producer_t дерегистрирует созданную для выполнения этого запроса кооперацию. Соответствующий агент исчезает и инициатор запроса никакого ответа не получит. Ну а вот и весь код request_conductor_t:

class request_conductor_t
{
public :
   request_conductor_t(
      so_5::rt::environment_t & env,
      const performers_info_ptr_t & performers )
      :  m_env( env )
      ,  m_performers( performers )
   {}

   void
   initiate_request(
      long long id,
      const std::string & key,
      const so_5::rt::mbox_t & reply_to )
   {
      auto request_performer = std::unique_ptr< so_5::rt::agent_t >(
            new a_request_performer_t(
                  m_env, m_performers, id, key, reply_to ) );

      m_env.register_agent_as_coop(
            make_coop_name( id ),
            std::move( request_performer ),
            so_5::disp::thread_pool::create_disp_binder(
                  "cpu",
                  so_5::disp::thread_pool::params_t() ) );
   }

   void
   cancel_request(
      long long id )
   {
      m_env.deregister_coop( make_coop_name( id ),
            so_5::rt::dereg_reason::normal );
   }

private :
   so_5::rt::environment_t & m_env;
   const performers_info_ptr_t m_performers;

   std::string
   make_coop_name( long long id )
   {
      return "request_" + std::to_string( id );
   }
};

Для того, чтобы перейти к рассмотрению агента a_request_performer_t нужно сначала обсудить два вспомогательных момента.

Первый момент -- это структура performers_info_t:

struct performers_info_t
{
   const so_5::rt::mbox_t m_mem_cache;
   const so_5::rt::mbox_t m_disk_cache;
   const so_5::rt::mbox_t m_network;

   performers_info_t(
      so_5::rt::mbox_t mem_cache,
      so_5::rt::mbox_t disk_cache,
      so_5::rt::mbox_t network )
      :  m_mem_cache( std::move( mem_cache ) )
      ,  m_disk_cache( std::move( disk_cache ) )
      ,  m_network( std::move( network ) )
   {}
};

Она нужна потому, что в SObjectizer для обмена сообщениями между агентами нужно знать mbox-ы (почтовые ящики), в которые сообщения следует отсылать. Как раз экземпляр структуры performers_info_t и хранит ссылки на mbox сущностей, с которыми нужно провзаимодействовать для выполнения запроса: два кэша (дисковый и в памяти) + агент для взаимодействия с сетью. В коде из Хабровской статьи ничего подобного нет, т.к. там индентификация сущностей выполнена в коде другим способом -- через порталы, например:

boost::optional<std::string> result = goAnyResult<std::string>({
   [&key] {
      return portal<DiskCache>()->get(key);
   }, [&key] {
      return portal<MemCache>()->get(key);
   }
});

Я сейчас не буду углублять в то, какой способ лучше, а какой хуже, это сильно зависит от внешних условий. Просто факт в том, что в SObjectizer нельзя написать так, как сделано у автора статьи. Поэтому потребовалось иметь performers_info_t.

Второй момент, который необходимо осветить перед обсуждением агента a_request_performer_t -- это перечень сообщений, которыми обмениваются между собой агенты. Это очень важно, т.к. единственный способ общения агентов между собой -- это обмен сообщениями, поэтому насколько удачно будут подобранны сообщения, настолько просто будет затем разрабатывать и писать агентов.

В данном примере агент a_request_performer_t отсылает следующие сообщения.

Сообщение msg_value_request:

struct msg_value_request : public so_5::rt::message_t
{
   long long m_id;
   const std::string m_key;
   const so_5::rt::mbox_t m_reply_to;

   msg_value_request(
      long long id,
      std::string key,
      so_5::rt::mbox_t reply_to )
      :  m_id( id )
      ,  m_key( std::move( key ) )
      ,  m_reply_to( std::move( reply_to ) )
   {}
};

Отсылается агентам-кэшам и агенту для общения с сетью. Это сообщение указывает получателю, что нужно проверить наличие строки по заданному ключу и ответить о результате поиска на mbox-а, который задан полем msg_value_request::m_reply_to. Т.к. в SObjectizer-решении все выполняется только асинхронными сообщениями, то исполнитель запроса не имеет никакой другой возможности, кроме как отослать результат операции в ответ своим сообщением. Но куда отсылать, ведь запросы могут приходить от разных агентов? Как раз для решения этой задачи и нужно сопровождать запрос обратным адресом отправителя.

Когда агент a_request_performer_t получает ответ (будь то кэш или сеть), он отсылает результат выполнения запроса агенту a_requests_producer_t посредством сообщения msg_value_response:

struct msg_value_response : public so_5::rt::message_t
{
   long long m_id;
   const std::string m_key;
   const std::string m_value;

   msg_value_response(
      long long id,
      std::string key,
      std::string value )
      :  m_id( id )
      ,  m_key( std::move( key ) )
      ,  m_value( std::move( value ) )
   {}
};

Если a_request_performer_t получает ответ от сети, то он отсылает агентам-кэшам сообщение msg_update_cache:

struct msg_update_cache : public so_5::rt::message_t
{
   long long m_id;
   const std::string m_key;
   const std::string m_value;
   const so_5::rt::mbox_t m_reply_to;

   msg_update_cache(
      long long id,
      std::string key,
      std::string value,
      so_5::rt::mbox_t reply_to )
      :  m_id( id )
      ,  m_key( std::move( key ) )
      ,  m_value( std::move( value ) )
      ,  m_reply_to( std::move( reply_to ) )
   {}
};

Есть еще одно сообщение, которое отсылает a_requests_performer_t -- это его собственное сообщение msg_timeout, которое говорит о том, что истекло время для выполнения конкретной операции:

private :
   struct msg_timeout : public so_5::rt::message_t
   {
      const std::string m_what;

      msg_timeout( std::string what ) : m_what( std::move( what ) ) {}
   };

Получает агент a_request_performer_t четыре сообщения. Про одно из них, msg_timeout уже было сказано. Два других сообщения идут от агентов-кэшей:

struct msg_cache_check_result : public so_5::rt::message_t
{
   long long m_id;
   bool m_found = false;
   const std::string m_value;

   msg_cache_check_result( long long id )
      :  m_id( id )
   {}

   msg_cache_check_result( long long id, std::string value )
      :  m_id( id ), m_found( true ), m_value( std::move( value ) )
   {}
};

struct msg_cache_updated : public so_5::rt::message_t
{
   long long m_id;

   msg_cache_updated( long long id ) : m_id( id ) {}
};

Сообщение msg_cache_check_result сообщает о результате проверки наличия строки в кэше. Поскольку Boost я не использую, то для индикации успешности поиска применяется булевый атрибут m_found. Отсюда и два конструктора для msg_cache_check_result: один для случая неудачного поиска, второй -- для удачного. Сообщение же msg_cache_updated является подтверждением того, что кэш завершил свое обновление.

Еще одно сообщение, msg_network_result, агент a_request_performer_t получает от агента для работы с сетью. Это сообщение всегда содержит искомую строку (по крайней мере, я так понял условие задачи из статьи):

struct msg_network_result : public so_5::rt::message_t
{
   long long m_id;
   std::string m_value;

   msg_network_result( long long id, std::string value )
      :  m_id( id ), m_value( std::move( value ) )
   {}
};

Ну а теперь можно перейти к основному агенту -- a_request_performer_t. Его код довольно объемен, но выполняемые им действия тривиальны.

Самое важное в этом агенте -- это три его возможных состояния: st_wait_cache_resp, st_wait_network_resp и st_wait_cache_updates. Начальным состоянием становится st_wait_cache_resp. В этом состоянии агент ждет результатов проверки наличия строки в кэшах. Если строка найдена, то отсылается ответ и агент завершает свою работу. Если же ни в одном из кэшей строки нет, то агент переходит в состояние st_wait_network_resp. В этом состоянии запрос отсылается агенту по работе с сетью и ожидается ответ от агента. Когда ответ приходит, происходит переход в состояние st_wait_cache_updates. В этом состоянии агентам-кэшам дается команда на обновление своего содержимого и ожидаются подтверждения и завершении обновлений. Получив подтверждения a_request_performer_t отсылает ответ инициатору запроса и завершает свою работу. Собственно, вся логика работы показана внутри метода so_define_agent() при подписке событий агента:

virtual void
so_define_agent() override
{
   this >>= st_wait_cache_resp;

   st_wait_cache_resp
      .event( &a_request_performer_t::evt_cache_check_result )
      .event( &a_request_performer_t::evt_timeout );

   st_wait_network_resp
      .event( &a_request_performer_t::evt_network_result )
      .event( &a_request_performer_t::evt_timeout );

   st_wait_cache_updates
      .event( &a_request_performer_t::evt_cache_updated );
}

Из приведенного фрагмента видно, что тайм-ауты агент обрабатывает только когда ждет результаты проверок в кэшах и из сети. Когда же результат из сети был получен и происходит обновление кэшей, тайм-ауты игнорируются.

Кстати, на счет тайм-аутов. Агент a_request_performer_t выставляет два тайм-аута, но при помощи одного и того же типа сообщения. Первый, внутри so_evt_start(), на всю операцию целиком. Второй, внутри initiate_network_request(), только на операции через сеть. В итоге, SObjectizer может иметь два разных экземпляра msg_timeout для одного агента. Какой из них первым к агенту придет, тот и вызовет завершение работы агента.

Так же в коде a_request_performer_t можно отметить код метода so_evt_start() -- этот метод автоматически вызывается SObjectizer-ом сразу после успешной регистрации кооперации агента. В этом примере so_evt_start() задействован для начала обработки запроса: выставление тайм-аута на всю операцию и отправка запросов к двум кэшам.

Теперь можно привести весь код агента a_request_performer_t. Повторюсь, он объемен, но не сложен. Отчасти его объем определяется инфраструктурными вещами. Например, необходимостью декларировать состояния агента в качестве атрибутов агента. Или же необходимостью сохранять внутри агента параметры исходного запроса (m_id, m_key, m_reply_to). Отчасти тем, что непосредственно в агенте сосредоточен код, который у автора Хабровской статьи находится в его библиотечных конструкциях. Например, у меня в явном виде присутствуют m_cache_checks_results и m_cache_update_results, тогда как аналогичные вещи у Григория Демченко запрятаны в реализации библиотечных фунций goWait и goAnyResult.

class a_request_performer_t : public so_5::rt::agent_t
{
public :
   a_request_performer_t(
      so_5::rt::environment_t & env,
      const performers_info_ptr_t & performers,
      long long id,
      std::string key,
      so_5::rt::mbox_t reply_to )
      :  so_5::rt::agent_t( env )
      ,  m_performers( performers )
      ,  m_id( id )
      ,  m_key( std::move( key ) )
      ,  m_reply_to( std::move( reply_to ) )
   {}

   virtual void
   so_define_agent() override
   {
      this >>= st_wait_cache_resp;

      st_wait_cache_resp
         .event( &a_request_performer_t::evt_cache_check_result )
         .event( &a_request_performer_t::evt_timeout );

      st_wait_network_resp
         .event( &a_request_performer_t::evt_network_result )
         .event( &a_request_performer_t::evt_timeout );

      st_wait_cache_updates
         .event( &a_request_performer_t::evt_cache_updated );
   }

   virtual void
   so_evt_start() override
   {
      TRACE() << "(" << m_id << ":" << m_key << ") processing started"
            << std::endl;

      so_5::send_delayed_to_agent< msg_timeout >( *this,
            std::chrono::seconds( 1 ),
            "total operation timeout" );

      so_5::send< msg_value_request >( m_performers->m_mem_cache,
            m_id, m_key, so_direct_mbox() );
      so_5::send< msg_value_request >( m_performers->m_disk_cache,
            m_id, m_key, so_direct_mbox() );
   }

private :
   struct msg_timeout : public so_5::rt::message_t
   {
      const std::string m_what;

      msg_timeout( std::string what ) : m_what( std::move( what ) ) {}
   };

   const so_5::rt::state_t st_wait_cache_resp = so_make_state();
   const so_5::rt::state_t st_wait_network_resp = so_make_state();
   const so_5::rt::state_t st_wait_cache_updates = so_make_state();

   const performers_info_ptr_t m_performers;

   const long long m_id;
   const std::string m_key;
   std::string m_value;

   const so_5::rt::mbox_t m_reply_to;

   unsigned int m_cache_checks_results = 0;
   unsigned int m_cache_update_results = 0;

   static const unsigned int m_cache_count = 2;

   void
   evt_cache_check_result( const msg_cache_check_result & evt )
   {
      ++m_cache_checks_results;

      TRACE() << "(" << m_id << ":" << m_key << ") cache response ("
            << m_cache_checks_results << "/" << m_cache_count << "), found="
            << ( evt.m_found ? "Y" : "N" ) << std::endl;

      if( evt.m_found )
      {
         TRACE() << "(" << m_id << ":" << m_key << ") found in cache"
               << std::endl;
         m_value = evt.m_value;

         return_result_and_finish_work();
      }
      else
      {
         if( m_cache_checks_results == m_cache_count )
            initiate_network_request();
      }
   }

   void
   evt_network_result( const msg_network_result & evt )
   {
      TRACE() << "(" << m_id << ":" << m_key << ") value from network: "
            << evt.m_value << std::endl;

      m_value = evt.m_value;

      this >>= st_wait_cache_updates;

      so_5::send< msg_update_cache >( m_performers->m_mem_cache,
            m_id, m_key, m_value, so_direct_mbox() );
      so_5::send< msg_update_cache >( m_performers->m_disk_cache,
            m_id, m_key, m_value, so_direct_mbox() );
   }

   void
   evt_cache_updated( const msg_cache_updated & )
   {
      ++m_cache_update_results;

      TRACE() << "(" << m_id << ":" << m_key << ") cache updated ("
            << m_cache_update_results << "/" << m_cache_count << ")"
            << std::endl;

      if( m_cache_update_results == m_cache_count )
         return_result_and_finish_work();
   }

   void
   evt_timeout( const msg_timeout & evt )
   {
      TRACE() << "(" << m_id << ":" << m_key << ") timedout: "
            << evt.m_what << std::endl;

      so_deregister_agent_coop_normally();
   }

   void
   initiate_network_request()
   {
      this >>= st_wait_network_resp;

      so_5::send_delayed_to_agent< msg_timeout >( *this,
            std::chrono::milliseconds( 500 ),
            "network operations timeout" );

      so_5::send< msg_value_request >( m_performers->m_network,
            m_id, m_key, so_direct_mbox() );
   }

   void
   return_result_and_finish_work()
   {
      so_5::send< msg_value_response >( m_reply_to, m_id, m_key, m_value );

      so_deregister_agent_coop_normally();
   }
};

Из принципиальных моментов осталось показать процедуру создания и привязки агентов к контекстам.

Нам нужно три разных контекста. Один -- для агента a_requests_producer_t, который будет имитировать UI-нить. Для этого выделен отдельный экземпляр диспетчера one_thread с именем "ui".

Второй диспетчер, под именем "cpu", будет обслуживать кэши и агентов-исполнителей. Под это дело отводится thread_pool диспетчер с тремя рабочими нитями. Кстати говоря, привязку агентов-исполнителей к этому диспетчеру можно увидеть в коде request_conductor_t.

Ну и третий диспетчер для работы с сетью, под именем "net". Здесь не принципиально какой это будет диспетчер, главное, чтобы он предоставлял свой собственный контекст. Поэтому выбран active_obj, хотя можно было бы и one_thread.

Все эти диспетчеры создаются в функции настройки SObjectizer Environment перед стартом:

void
init_env_params( so_5::rt::environment_params_t & params )
{
   params.add_named_dispatcher( "ui",
         so_5::disp::one_thread::create_disp() );

   params.add_named_dispatcher( "cpu",
         so_5::disp::thread_pool::create_disp( 3 ) );

   params.add_named_dispatcher( "net",
         so_5::disp::active_obj::create_disp() );
}

Ну а сами агенты создаются в двух разных кооперациях. Первая кооперация, "performers", содержит агенты-кэши и агента для работы с сетью. Внутри нее два агента-кэша привязываются к диспетчеру "cpu", но при этом указывается, что свои события одни должны обрабатывать независимо друг от друга, что позволяет им работать параллельно (именно для этого задается fifo_t::individual). Третий агент этой кооперации привязывается к диспетчеру "net".

Вторая кооперация состоит всего из одного агента -- a_requests_producer_t. Он так же привязывается с своему собственному диспетчеру.

void
init( so_5::rt::environment_t & env )
{
   so_5::disp::thread_pool::params_t pool_params;
   pool_params.fifo( so_5::disp::thread_pool::fifo_t::individual );

   auto performers_coop = env.create_coop( "performers" );

   auto mem_cache = performers_coop->add_agent(
         new a_cache_t( env, 5150 ),
         so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
   auto disk_cache = performers_coop->add_agent(
         new a_cache_t( env, 50550 ),
         so_5::disp::thread_pool::create_disp_binder( "cpu", pool_params ) );
   auto network = performers_coop->add_agent(
         new a_network_t( env, 150750 ),
         so_5::disp::active_obj::create_disp_binder( "net" ) );

   env.register_coop( std::move( performers_coop ) );

   auto performers_info = std::make_shared< performers_info_t >(
         mem_cache->so_direct_mbox(),
         disk_cache->so_direct_mbox(), 
         network->so_direct_mbox() );

   auto conductor = std::make_shared< request_conductor_t >(
         env, performers_info );

   env.register_agent_as_coop( "initiator",
         new a_requests_producer_t( env, conductor ),
         so_5::disp::one_thread::create_disp_binder( "ui" ) );
}

Ну вот с основными моментами, пожалуй, и все. Теперь можно показать результат одного из запусков этого примера у меня на машине:

initiate request for: (1:Hello)
(1:Hello) processing started
(1:Hello) cache response (1/2), found=N
(1:Hello) cache response (2/2), found=N
(1:Hello) will be requested via network
initiate request for: (2:Bye)
(2:Bye) processing started
(2:Bye) cache response (1/2), found=N
(2:Bye) cache response (2/2), found=N
(2:Bye) will be requested via network
initiate request for: (3:Hello)
(3:Hello) processing started
cancel request for id: 3
(1:Hello) value from network: <Hello>
initiate request for: (4:Hello)
(4:Hello) processing started
(1:Hello) cache updated (1/2)
(4:Hello) cache response (1/2), found=Y
(4:Hello) found in cache
response for: (4:Hello) = <Hello>
(1:Hello) cache updated (2/2)
response for: (1:Hello) = <Hello>
(2:Bye) timedout: network operations timeout

Тут можно видеть, как запрос #2 истекает по тайм-ауту (это из-за того, что кэши и агент для работы с сетью используют случайные тайм-ауты при выдаче результатов). А запрос #3 отменяется и его результатов нигде нет.

Но самое интересное происходит с запросами #1 и #4. Видно, что обработки запроса #1 потребовалось идти в сеть, после чего началось обновление кэшей. И пока это обновление длилось, запрос #4 успел успешно обработаться потому, что в одном из кэшей строка с ключем "Hello" уже обнаружилась. Поэтому инициатор получил ответ на запрос #4 раньше, чем ответ на запрос #1. В принципе, ничего удивительного, асинхронность -- она такая :)

Для полноты картины нужно привести еще код агентов, которые имитируют работу кэшей, а так же агента, имитирующего работу с сетью. В них все довольно тривиально. Ну и, не буду скрывать, агент, имитирующий сеть, стал результатом копипасты агента-кэша:

class a_cache_t : public so_5::rt::agent_t
{
public :
   a_cache_t(
      so_5::rt::environment_t & env,
      unsigned int min_pause,
      unsigned int max_pause )
      :  so_5::rt::agent_t( env )
      ,  m_min_pause( min_pause )
      ,  m_pause_delta( max_pause - min_pause )
   {}

   virtual void
   so_define_agent() override
   {
      so_default_state().event( [=]( const msg_value_request & evt ) {
            auto it = m_values.find( evt.m_key );
            auto delay = random_delay();

            if( it == m_values.end() )
               so_5::send_delayed< msg_cache_check_result >(
                     *this, evt.m_reply_to, delay, evt.m_id );
            else
               so_5::send_delayed< msg_cache_check_result >(
                     *this, evt.m_reply_to, delay, evt.m_id, it->second );
         } );

      so_default_state().event( [=]( const msg_update_cache & evt ) {
            m_values[ evt.m_key ] = evt.m_value;

            so_5::send_delayed< msg_cache_updated >(
                  *this, evt.m_reply_to, random_delay(), evt.m_id );
         } );
   }

private :
   const unsigned int m_min_pause;
   const unsigned int m_pause_delta;

   std::unordered_map< std::string, std::string > m_values;

   std::chrono::milliseconds
   random_delay() const
   {
      return std::chrono::milliseconds( m_min_pause +
            static_castunsigned int >( std::rand() ) % m_pause_delta );
   }
};

class a_network_t : public so_5::rt::agent_t
{
public :
   a_network_t(
      so_5::rt::environment_t & env,
      unsigned int min_pause,
      unsigned int max_pause )
      :  so_5::rt::agent_t( env )
      ,  m_min_pause( min_pause )
      ,  m_pause_delta( max_pause - min_pause )
   {}

   virtual void
   so_define_agent() override
   {
      so_default_state().event( [=]( const msg_value_request & evt ) {
            TRACE() << "(" << evt.m_id << ":" << evt.m_key << ") will be "
                  "requested via network" << std::endl;

            so_5::send_delayed< msg_network_result >(
                  *this, evt.m_reply_to, random_delay(),
                  evt.m_id, "<" + evt.m_key + ">" );
         } );
   }

private :
   const unsigned int m_min_pause;
   const unsigned int m_pause_delta;

   std::chrono::milliseconds
   random_delay() const
   {
      return std::chrono::milliseconds( m_min_pause +
            static_castunsigned int >( std::rand() ) % m_pause_delta );
   }
};

Что остается сказать по итогам?

Меня, конечно, расстраивает многословность SObjectizer-овского решения. Чем больше строк в программе, тем больше мест, где можно допустить ошибку.

С другой стороны, здесь объем получается не из-за сложности. Основная логика более-менее лаконично записывается.

Мне, конечно же, проще разобраться в своем решении. И не только потому, что я хорошо знаю инструмент, над которым работаю. Но и потому, что такой стиль мне ближе, т.к. в свое время на C++ по другому и нельзя было писать. Когда-то давным-давно GUI-приложения (хоть на MFC, хоть на Qt, хоть на wxWidgets) или сетевые вещи на ACE писались именно в таком стиле. Это не могло не отложить свой отпечаток.

Совеременный же код на новом C++ воспринимать сложнее. Может потому, что там ряд непривычных мне аспектов упрятан в детали реализации, но про них нужно знать. Например, о том, что и как происходит при истечении тайм-аута в коде из Хабровской статьи.

В общем, отрадно, что решение на SObjectizer не потребовало много времени (хотя в процессе написания поста пару мелких багов я нашел и поправил). Но общий итог неутешителен. Нужно еще работать и работать.

В заключение хочу сказать спасибо ув.тов.Сергею Галичанину, у которого я нашел ссылку на Хабровскую статью. Ну и, конечно же, благодарю всех, кто дочитал до этого места :) А если кто-то поспособствует распространеннию ссылки на этот пост, то будет вообще замечательно ;)

Отправить комментарий