В SObjectizer-е всегда взаимодействие между агентами осуществлялось через асинхронный обмен сообщениями. И это постоянно вызывало вопросы и просьбы добавить в SObjectizer какую-то форму синхронного взаимодействия агентов друг с другом. До сих пор ни одна эта просьба не была удовлетворена :) Основными причинами тому были мое непоколебимое убеждение в том, что единственной идеологически правильной формой взаимодействия является асинхронная, а так же неумение найти способ хоть как-то реализовать синхронные вызовы в SObjectizer-4. Из-за этого, по наследству, отсутствие синхронности перешло и в SObjectizer-5. Но вопросы и сожаления пользователей остались :) Так что мысли продолжали возвращаться к этой теме.
Вчера из-за вынужденного безделья в поезде появилась мысль о том, как можно добавить синхронное взаимодействие агентов в SObjectizer-5 без серьезной переделки ядра SObjectizer-а. В попытках ее зафиксировать были рождены и отвергнуты несколько вариантов. Ниже я опишу тот, на котором я пока остановился.
Синхронное взаимодействие в SObjectizer будет осуществляться посредством "сервисов" (лучшего термина я не придумал). Один агент предоставляет синхронный сервис, другой агент его использует. Главное требование -- поставщик и пользователь синхронного сервиса должны работать на разных рабочих нитях.
Поставщик сервиса -- это агент, который должен быть унаследован не от общего базового типа so_5::rt::agent_t, а от специального базового типа so_5::rt::a_service_provider_t.
Далее, все предоставляемые агентом сервисы должны быть описаны внутри агента в виде специальных атрибутов:
class a_my_service_t : public so_5::rt::a_service_provider_t
{
typedef so_5::rt::a_service_provider_t base_type_t;
private :
// Это сервис, который предоставляет агент.
so_5::rt::service_handle_t< Result, Argument > svc_my;
|
Такие атрибуты должны быть специальным образом инициализированы в конструкторе агента (нужно установить связь между хендлом сервиса и агентом, который реализует данный сервис).
public :
// Конструктор для агента.
a_my_service_t(
so_5::rt::so_environment_t & env )
: base_type_t( env )
// Сервис должен быть связан с агентом.
// Метод self_service_provider унаследован от
// a_service_provider_t.
, svc_my( self_service_provider() )
{}
|
Сервис реализуется методами агента, по аналогии с обычными событиями. Но, в отличии от обработчиков событий, сервисные методы должны иметь возвращаемое значение. Кроме того, как и с обработчиками событий, если у агента есть несколько состояний, то в каждом из них сервисный метод может быть свой:
class a_my_service_t : public so_5::rt::a_service_provider_t
{
private :
// Это сервис, который предоставляет агент.
so_5::rt::service_handle_t< Result, Argument > svc_my;
// Состояния, в которых может находится агент.
so_5::rt::agent_state_t st_free;
so_5::rt::agent_state_t st_busy;
public :
...
// Этот обработчик события реализует сервис в состоянии st_free.
std::unique_ptr< Result >
svc_my_when_free( const so_5::rt::event_data_t< Argument > & a );
// Этот обработчик события реализует сервис в состоянии st_busy.
std::unique_ptr< Result >
svc_my_when_busy( const so_5::rt::event_data_t< Argument > & a );
};
|
Увязать все это вместе (т.е. хендл сервиса, состояние агента и сервисные методы) разработчик должен в штатном методе so_define_agent по аналогии с подпиской событий на сообщения:
void
a_my_service_t::so_define_agent()
{
so_define_service( svc_my )
.implement( st_free, &a_my_service_t::svc_my_when_free )
.implement( st_busy, &a_my_service_t::svc_my_when_busy );
// Остальные действия...
...
}
|
Метод so_define_agent вызывается у агентов в процессе выполнения регистрации кооперации, в которую входит агент. Далее SObjectizer распределяет агентов по их рабочим нитям (для этого каждому из агентов назначается привязка к тому или иному диспетчеру). На стадии этой привязки диспетчеры будут выполнять дополнительные действия. Диспетчер будет смотреть, является ли агент наследником a_service_provider. И если является, то диспетчер будет брать у агента список его сервисов (список объектов-хендлов) и каждому хендлу будет указывать идентификатор рабочей нити агента. Этот идентификатор затем будет использоваться для защиты от дедлоков.
Для того, чтобы какой-то агент мог использовать синхронный сервис, должен быть доступен хендл этого сервиса. Т.е. ссылка/указатель на объект типа service_handle_t<R, A>. Каким образом пользователь будет получать этот хендл -- это забота пользователя. Например, показанный выше a_my_service_t может предоставлять метод-getter вида:
so_5::rt::service_handle_t< Result, Argument >
a_my_service_t::get_svc_handle()
{
return svc_my;
}
|
Или же агент a_my_service_t может отсылать этот хендл в каком-то своем сообщении. Например, когда агент переходит из состояния st_busy в st_free он может отослать сообщение msg_i_am_free, в которое поместит хендл своего сервиса:
struct msg_i_am_free : public so_5::rt::message_t
{
so_5::rt::service_handle_t< Result, Argument > svc;
...
}
void
a_my_service_t::switch_to_st_free()
{
so_change_state( st_free );
std::unique_ptr< msg_i_am_free > msg( new msg_i_am_free() );
msg->svc = svc_my;
...
m_my_mbox->deliver_message( std::move( msg ) );
}
|
Когда пользователь сервиса получает хендл сервиса, он имеет возможность обратиться к сервису двумя способами. Первый способ -- с использованием идиомы future.
// Обработчик уведомления о том, что агент-сервис освободился.
void
a_service_user_t::evt_service_free(
const so_5::rt::event_data_t< msg_i_am_free > & msg )
{
// Запрашиваем сервис...
so_5::rt::future_t< Result > f = msg->svc.async_request( new Argument(...) );
// ... Выполняем какие-то другие действия и даем сервису
// время выполнить наш запрос...
...
// Получаем результат запроса.
std::unique_ptr< Result > r = f.get();
...
}
|
В этом случае синхронность проявляется, по сути, только при обращении к future_t::get(). Полностью же синхронное обращение к сервису выполняется посредством метода sync_request:
// Обработчик уведомления о том, что агент-сервис освободился.
void
a_service_user_t::evt_service_free(
const so_5::rt::event_data_t< msg_i_am_free > & msg )
{
// Запрашиваем сервис и сразу получаем результат.
std::unique_ptr< Result > r = msg->svc.sync_request( new Argument(...) );
...
}
|
Хотя тут напрашивается простая реализация sync_request в виде простой обертки вокруг async_request и future_t::get:
std::unique_ptr< Result >
sync_request( std::unique_ptr< Argument > a )
{
so_5::rt::future_t< Result > f = async_request( std::move(a) );
return f.get();
}
|
Как бы то ни было, и async_request и sync_request дают пользователю SObjectizer возможность, которой не было ранее, а именно: внутри обработчика события обратиться к другому агенту и тут же получить результат этого обращения. До сих пор программистам для этого нужно было разбивать свой обработчик события на два: в первом инициируется обращение к другому агенту, во втором получается и обрабатывается результат обращения. До сих пор нужно делать так:
// Первая часть обработки.
void
a_some_agent_t::evt_something_happened( event_data_t< SomeInfo > & msg )
{
// Какая-то первая часть обработки...
...
// Тут пришло время обратиться к другому агенту.
m_another_agent_mbox->deliver_message( new SomeRequest(...) );
// На этом обработка заканчивается до ожидания ответа другого агента.
}
// Вторая часть обработки.
void
a_some_agent_t::evt_another_agent_reply( event_data_t< SomeReply > & msg )
{
// Вторая часть обработки...
...
}
|
С добавлением описанного выше синхронного взаимодействия можно будет писать так:
void
a_some_agent_t::evt_something_happened( event_data_t< SomeInfo > & msg )
{
// Какая-то первая часть обработки...
...
// Тут пришло время обратиться к другому агенту.
std::unique_ptr< SomeReply > r = m_another_agent_svc.sync_request(
new SomeRequest(...) );
// Вторая часть обработки...
...
}
|
Теперь несколько слов о том, как все это может быть реализовано "под капотом". В самом простейшем виде.
Когда агент создает у себя хендл сервиса, то этот хендл автоматически регистрирует в SObjectizer-е анонимный mbox, ссылка на который будет хранится внутри хендла сервиса.
Когда агент связывает сервисные методы с хендлом сервиса в so_define_agent() происходит подписка этого агента на специальные сообщения в заданных состояниях для анонимного mbox-а хендла сервиса.
Когда какой-то агент обращается к сервису происходит отсылка сообщения на mbox хендла сервиса. Это сообщение обычным образом диспетчируется и передается в очередь агента на обработку. Но когда инициируется обработка этого сообщения, происходит несколько больше действий, чем с обычным сообщением:
- вызывается не сервисный метод напрямую, а специальная обертка. Внутри которой вызывается сервисный метод и обрабатывается результат его работы;
- после вызова сервисного метода обертка передает результат его работы в future_t, который связан с вызовом, после чего дает сигнал о том, что сервис обработал.
На стороне пользователя сервиса все самое главное происходит в методе future_t::get(). Там выполняется ожидание сигнала о завершении обработки сервиса или же истечение тайм-аута операции. Тайм-аут нужен для ряда случаев: сервис не был обработан вовсе (предоставляющий сервис агент был дерегистрирован или находился не в том состоянии, очередь заявок к этому агенту была слишком большой) или же произошел деадлок.
Защита от деадлоков -- это отдельный вопрос. В простейшем случае она может быть реализована в виде пары предохранительных мер:
- Проверки того, что пользователь и поставщик сервиса работают на разных нитях (эта проверка может быть сделана в самом начале async_request/sync_requst, т.к. в хендле сервиса хранится идентификатор его рабочей нити). Такая проверка позволит сразу диагностировать деадлоки, возникающие из-за работы на одной нити;
- Тайм-аутов. Истечение времени ожидания ответа может означать наличие более сложного деадлока. Например, агент A обращается к сервису агента B, который обращается к сервису агента C, который обращается к сервису агента A.
Думаю, что для самой первой реализации синхронного взаимодействия этого будет вполне достаточно. Причем работать это будет довольно шустро. В последующих реализациях можно будет перейти к более сложным схемам, например, к хранению для рабочих нитей списков ожидающих их нитей с проверкой наличия циклов в этих списках. Но тут уже нужно будет смотреть на накладные расходы, а так же на актуальность такого точного контроля.
Вот такая черновая схема получилась в общих чертах. Понятно, что дьявол будет скрываться в пока еще не известных деталях. Но, по первым впечатлениям, вариант вполне работоспособный и реализуемый.