четверг, 17 октября 2013 г.

[prog.c++] Возможно, в SObjectizer 5.3 появится синхронное взаимодействие агентов

В SObjectizer-е всегда взаимодействие между агентами осуществлялось через асинхронный обмен сообщениями. И это постоянно вызывало вопросы и просьбы добавить в SObjectizer какую-то форму синхронного взаимодействия агентов друг с другом. До сих пор ни одна эта просьба не была удовлетворена :) Основными причинами тому были мое непоколебимое убеждение в том, что единственной идеологически правильной формой взаимодействия является асинхронная, а так же неумение найти способ хоть как-то реализовать синхронные вызовы в SObjectizer-4. Из-за этого, по наследству, отсутствие синхронности перешло и в SObjectizer-5. Но вопросы и сожаления пользователей остались :) Так что мысли продолжали возвращаться к этой теме.

Вчера из-за вынужденного безделья в поезде появилась мысль о том, как можно добавить синхронное взаимодействие агентов в SObjectizer-5 без серьезной переделки ядра SObjectizer-а. В попытках ее зафиксировать были рождены и отвергнуты несколько вариантов. Ниже я опишу тот, на котором я пока остановился.

Синхронное взаимодействие в SObjectizer будет осуществляться посредством "сервисов" (лучшего термина я не придумал). Один агент предоставляет синхронный сервис, другой агент его использует. Главное требование -- поставщик и пользователь синхронного сервиса должны работать на разных рабочих нитях.

Поставщик сервиса -- это агент, который должен быть унаследован не от общего базового типа so_5::rt::agent_t, а от специального базового типа so_5::rt::a_service_provider_t.

Далее, все предоставляемые агентом сервисы должны быть описаны внутри агента в виде специальных атрибутов:

class a_my_service_t : public so_5::rt::a_service_provider_t
{
      typedef so_5::rt::a_service_provider_t base_type_t;

   private :
      // Это сервис, который предоставляет агент.
      so_5::rt::service_handle_t< Result, Argument > svc_my;

Такие атрибуты должны быть специальным образом инициализированы в конструкторе агента (нужно установить связь между хендлом сервиса и агентом, который реализует данный сервис).

   public :
      // Конструктор для агента.
      a_my_service_t(
         so_5::rt::so_environment_t & env )
         :  base_type_t( env )
            // Сервис должен быть связан с агентом.
            // Метод self_service_provider унаследован от
            // a_service_provider_t.
         ,  svc_my( self_service_provider() )
         {}

Сервис реализуется методами агента, по аналогии с обычными событиями. Но, в отличии от обработчиков событий, сервисные методы должны иметь возвращаемое значение. Кроме того, как и с обработчиками событий, если у агента есть несколько состояний, то в каждом из них сервисный метод может быть свой:

class a_my_service_t : public so_5::rt::a_service_provider_t
{
   private :
      // Это сервис, который предоставляет агент.
      so_5::rt::service_handle_t< Result, Argument > svc_my;

      // Состояния, в которых может находится агент.
      so_5::rt::agent_state_t st_free;
      so_5::rt::agent_state_t st_busy;

   public :
      ...

      // Этот обработчик события реализует сервис в состоянии st_free.
      std::unique_ptr< Result >
      svc_my_when_free( const so_5::rt::event_data_t< Argument > & a );

      // Этот обработчик события реализует сервис в состоянии st_busy.
      std::unique_ptr< Result >
      svc_my_when_busy( const so_5::rt::event_data_t< Argument > & a );
};

Увязать все это вместе (т.е. хендл сервиса, состояние агента и сервисные методы) разработчик должен в штатном методе so_define_agent по аналогии с подпиской событий на сообщения:

void
a_my_service_t::so_define_agent()
{
   so_define_service( svc_my )
      .implement( st_free, &a_my_service_t::svc_my_when_free )
      .implement( st_busy, &a_my_service_t::svc_my_when_busy );

   // Остальные действия...
   ...
}

Метод so_define_agent вызывается у агентов в процессе выполнения регистрации кооперации, в которую входит агент. Далее SObjectizer распределяет агентов по их рабочим нитям (для этого каждому из агентов назначается привязка к тому или иному диспетчеру). На стадии этой привязки диспетчеры будут выполнять дополнительные действия. Диспетчер будет смотреть, является ли агент наследником a_service_provider. И если является, то диспетчер будет брать у агента список его сервисов (список объектов-хендлов) и каждому хендлу будет указывать идентификатор рабочей нити агента. Этот идентификатор затем будет использоваться для защиты от дедлоков.

Для того, чтобы какой-то агент мог использовать синхронный сервис, должен быть доступен хендл этого сервиса. Т.е. ссылка/указатель на объект типа service_handle_t<R, A>. Каким образом пользователь будет получать этот хендл -- это забота пользователя. Например, показанный выше a_my_service_t может предоставлять метод-getter вида:

so_5::rt::service_handle_t< Result, Argument >
a_my_service_t::get_svc_handle()
{
   return svc_my;
}

Или же агент a_my_service_t может отсылать этот хендл в каком-то своем сообщении. Например, когда агент переходит из состояния st_busy в st_free он может отослать сообщение msg_i_am_free, в которое поместит хендл своего сервиса:

struct msg_i_am_free : public so_5::rt::message_t
{
   so_5::rt::service_handle_t< Result, Argument > svc;
   ...
}

void
a_my_service_t::switch_to_st_free()
{
   so_change_state( st_free );

   std::unique_ptr< msg_i_am_free > msg( new msg_i_am_free() );
   msg->svc = svc_my;
   ...
   m_my_mbox->deliver_message( std::move( msg ) );
}

Когда пользователь сервиса получает хендл сервиса, он имеет возможность обратиться к сервису двумя способами. Первый способ -- с использованием идиомы future.

// Обработчик уведомления о том, что агент-сервис освободился.
void
a_service_user_t::evt_service_free(
   const so_5::rt::event_data_t< msg_i_am_free > & msg )
{
   // Запрашиваем сервис...
   so_5::rt::future_t< Result > f = msg->svc.async_request( new Argument(...) );
   // ... Выполняем какие-то другие действия и даем сервису
   // время выполнить наш запрос...
   ...
   // Получаем результат запроса.
   std::unique_ptr< Result > r = f.get();
   ... 
}

В этом случае синхронность проявляется, по сути, только при обращении к future_t::get(). Полностью же синхронное обращение к сервису выполняется посредством метода sync_request:

// Обработчик уведомления о том, что агент-сервис освободился.
void
a_service_user_t::evt_service_free(
   const so_5::rt::event_data_t< msg_i_am_free > & msg )
{
   // Запрашиваем сервис и сразу получаем результат.
   std::unique_ptr< Result > r = msg->svc.sync_request( new Argument(...) );
   ...
}

Хотя тут напрашивается простая реализация sync_request в виде простой обертки вокруг async_request и future_t::get:

std::unique_ptr< Result >
sync_request( std::unique_ptr< Argument > a )
{
   so_5::rt::future_t< Result > f = async_request( std::move(a) );
   return f.get();
}

Как бы то ни было, и async_request и sync_request дают пользователю SObjectizer возможность, которой не было ранее, а именно: внутри обработчика события обратиться к другому агенту и тут же получить результат этого обращения. До сих пор программистам для этого нужно было разбивать свой обработчик события на два: в первом инициируется обращение к другому агенту, во втором получается и обрабатывается результат обращения. До сих пор нужно делать так:

// Первая часть обработки.
void
a_some_agent_t::evt_something_happened( event_data_t< SomeInfo > & msg )
{
   // Какая-то первая часть обработки...
   ...
   // Тут пришло время обратиться к другому агенту.
   m_another_agent_mbox->deliver_message( new SomeRequest(...) );
   // На этом обработка заканчивается до ожидания ответа другого агента.
}

// Вторая часть обработки.
void
a_some_agent_t::evt_another_agent_reply( event_data_t< SomeReply > & msg )
{
   // Вторая часть обработки...
   ...
}

С добавлением описанного выше синхронного взаимодействия можно будет писать так:

void
a_some_agent_t::evt_something_happened( event_data_t< SomeInfo > & msg )
{
   // Какая-то первая часть обработки...
   ...
   // Тут пришло время обратиться к другому агенту.
   std::unique_ptr< SomeReply > r = m_another_agent_svc.sync_request(
         new SomeRequest(...) );
   // Вторая часть обработки...
   ...
}

Теперь несколько слов о том, как все это может быть реализовано "под капотом". В самом простейшем виде.

Когда агент создает у себя хендл сервиса, то этот хендл автоматически регистрирует в SObjectizer-е анонимный mbox, ссылка на который будет хранится внутри хендла сервиса.

Когда агент связывает сервисные методы с хендлом сервиса в so_define_agent() происходит подписка этого агента на специальные сообщения в заданных состояниях для анонимного mbox-а хендла сервиса.

Когда какой-то агент обращается к сервису происходит отсылка сообщения на mbox хендла сервиса. Это сообщение обычным образом диспетчируется и передается в очередь агента на обработку. Но когда инициируется обработка этого сообщения, происходит несколько больше действий, чем с обычным сообщением:

  • вызывается не сервисный метод напрямую, а специальная обертка. Внутри которой вызывается сервисный метод и обрабатывается результат его работы;
  • после вызова сервисного метода обертка передает результат его работы в future_t, который связан с вызовом, после чего дает сигнал о том, что сервис обработал.

На стороне пользователя сервиса все самое главное происходит в методе future_t::get(). Там выполняется ожидание сигнала о завершении обработки сервиса или же истечение тайм-аута операции. Тайм-аут нужен для ряда случаев: сервис не был обработан вовсе (предоставляющий сервис агент был дерегистрирован или находился не в том состоянии, очередь заявок к этому агенту была слишком большой) или же произошел деадлок.

Защита от деадлоков -- это отдельный вопрос. В простейшем случае она может быть реализована в виде пары предохранительных мер:

  1. Проверки того, что пользователь и поставщик сервиса работают на разных нитях (эта проверка может быть сделана в самом начале async_request/sync_requst, т.к. в хендле сервиса хранится идентификатор его рабочей нити). Такая проверка позволит сразу диагностировать деадлоки, возникающие из-за работы на одной нити;
  2. Тайм-аутов. Истечение времени ожидания ответа может означать наличие более сложного деадлока. Например, агент A обращается к сервису агента B, который обращается к сервису агента C, который обращается к сервису агента A.

Думаю, что для самой первой реализации синхронного взаимодействия этого будет вполне достаточно. Причем работать это будет довольно шустро. В последующих реализациях можно будет перейти к более сложным схемам, например, к хранению для рабочих нитей списков ожидающих их нитей с проверкой наличия циклов в этих списках. Но тут уже нужно будет смотреть на накладные расходы, а так же на актуальность такого точного контроля.

Вот такая черновая схема получилась в общих чертах. Понятно, что дьявол будет скрываться в пока еще не известных деталях. Но, по первым впечатлениям, вариант вполне работоспособный и реализуемый.

Отправить комментарий