В SObjectizer-е всегда взаимодействие между агентами осуществлялось через асинхронный обмен сообщениями. И это постоянно вызывало вопросы и просьбы добавить в SObjectizer какую-то форму синхронного взаимодействия агентов друг с другом. До сих пор ни одна эта просьба не была удовлетворена :) Основными причинами тому были мое непоколебимое убеждение в том, что единственной идеологически правильной формой взаимодействия является асинхронная, а так же неумение найти способ хоть как-то реализовать синхронные вызовы в SObjectizer-4. Из-за этого, по наследству, отсутствие синхронности перешло и в SObjectizer-5. Но вопросы и сожаления пользователей остались :) Так что мысли продолжали возвращаться к этой теме.
Вчера из-за вынужденного безделья в поезде появилась мысль о том, как можно добавить синхронное взаимодействие агентов в SObjectizer-5 без серьезной переделки ядра SObjectizer-а. В попытках ее зафиксировать были рождены и отвергнуты несколько вариантов. Ниже я опишу тот, на котором я пока остановился.
Синхронное взаимодействие в SObjectizer будет осуществляться посредством "сервисов" (лучшего термина я не придумал). Один агент предоставляет синхронный сервис, другой агент его использует. Главное требование -- поставщик и пользователь синхронного сервиса должны работать на разных рабочих нитях.
Поставщик сервиса -- это агент, который должен быть унаследован не от общего базового типа so_5::rt::agent_t, а от специального базового типа so_5::rt::a_service_provider_t.
Далее, все предоставляемые агентом сервисы должны быть описаны внутри агента в виде специальных атрибутов:
class a_my_service_t : public so_5::rt::a_service_provider_t { typedef so_5::rt::a_service_provider_t base_type_t; private : // Это сервис, который предоставляет агент. so_5::rt::service_handle_t< Result, Argument > svc_my; |
Такие атрибуты должны быть специальным образом инициализированы в конструкторе агента (нужно установить связь между хендлом сервиса и агентом, который реализует данный сервис).
public : // Конструктор для агента. a_my_service_t( so_5::rt::so_environment_t & env ) : base_type_t( env ) // Сервис должен быть связан с агентом. // Метод self_service_provider унаследован от // a_service_provider_t. , svc_my( self_service_provider() ) {} |
Сервис реализуется методами агента, по аналогии с обычными событиями. Но, в отличии от обработчиков событий, сервисные методы должны иметь возвращаемое значение. Кроме того, как и с обработчиками событий, если у агента есть несколько состояний, то в каждом из них сервисный метод может быть свой:
class a_my_service_t : public so_5::rt::a_service_provider_t { private : // Это сервис, который предоставляет агент. so_5::rt::service_handle_t< Result, Argument > svc_my; // Состояния, в которых может находится агент. so_5::rt::agent_state_t st_free; so_5::rt::agent_state_t st_busy; public : ... // Этот обработчик события реализует сервис в состоянии st_free. std::unique_ptr< Result > svc_my_when_free( const so_5::rt::event_data_t< Argument > & a ); // Этот обработчик события реализует сервис в состоянии st_busy. std::unique_ptr< Result > svc_my_when_busy( const so_5::rt::event_data_t< Argument > & a ); }; |
Увязать все это вместе (т.е. хендл сервиса, состояние агента и сервисные методы) разработчик должен в штатном методе so_define_agent по аналогии с подпиской событий на сообщения:
void a_my_service_t::so_define_agent() { so_define_service( svc_my ) .implement( st_free, &a_my_service_t::svc_my_when_free ) .implement( st_busy, &a_my_service_t::svc_my_when_busy ); // Остальные действия... ... } |
Метод so_define_agent вызывается у агентов в процессе выполнения регистрации кооперации, в которую входит агент. Далее SObjectizer распределяет агентов по их рабочим нитям (для этого каждому из агентов назначается привязка к тому или иному диспетчеру). На стадии этой привязки диспетчеры будут выполнять дополнительные действия. Диспетчер будет смотреть, является ли агент наследником a_service_provider. И если является, то диспетчер будет брать у агента список его сервисов (список объектов-хендлов) и каждому хендлу будет указывать идентификатор рабочей нити агента. Этот идентификатор затем будет использоваться для защиты от дедлоков.
Для того, чтобы какой-то агент мог использовать синхронный сервис, должен быть доступен хендл этого сервиса. Т.е. ссылка/указатель на объект типа service_handle_t<R, A>. Каким образом пользователь будет получать этот хендл -- это забота пользователя. Например, показанный выше a_my_service_t может предоставлять метод-getter вида:
so_5::rt::service_handle_t< Result, Argument > a_my_service_t::get_svc_handle() { return svc_my; } |
Или же агент a_my_service_t может отсылать этот хендл в каком-то своем сообщении. Например, когда агент переходит из состояния st_busy в st_free он может отослать сообщение msg_i_am_free, в которое поместит хендл своего сервиса:
struct msg_i_am_free : public so_5::rt::message_t { so_5::rt::service_handle_t< Result, Argument > svc; ... } void a_my_service_t::switch_to_st_free() { so_change_state( st_free ); std::unique_ptr< msg_i_am_free > msg( new msg_i_am_free() ); msg->svc = svc_my; ... m_my_mbox->deliver_message( std::move( msg ) ); } |
Когда пользователь сервиса получает хендл сервиса, он имеет возможность обратиться к сервису двумя способами. Первый способ -- с использованием идиомы future.
// Обработчик уведомления о том, что агент-сервис освободился. void a_service_user_t::evt_service_free( const so_5::rt::event_data_t< msg_i_am_free > & msg ) { // Запрашиваем сервис... so_5::rt::future_t< Result > f = msg->svc.async_request( new Argument(...) ); // ... Выполняем какие-то другие действия и даем сервису // время выполнить наш запрос... ... // Получаем результат запроса. std::unique_ptr< Result > r = f.get(); ... } |
В этом случае синхронность проявляется, по сути, только при обращении к future_t::get(). Полностью же синхронное обращение к сервису выполняется посредством метода sync_request:
// Обработчик уведомления о том, что агент-сервис освободился. void a_service_user_t::evt_service_free( const so_5::rt::event_data_t< msg_i_am_free > & msg ) { // Запрашиваем сервис и сразу получаем результат. std::unique_ptr< Result > r = msg->svc.sync_request( new Argument(...) ); ... } |
Хотя тут напрашивается простая реализация sync_request в виде простой обертки вокруг async_request и future_t::get:
std::unique_ptr< Result > sync_request( std::unique_ptr< Argument > a ) { so_5::rt::future_t< Result > f = async_request( std::move(a) ); return f.get(); } |
Как бы то ни было, и async_request и sync_request дают пользователю SObjectizer возможность, которой не было ранее, а именно: внутри обработчика события обратиться к другому агенту и тут же получить результат этого обращения. До сих пор программистам для этого нужно было разбивать свой обработчик события на два: в первом инициируется обращение к другому агенту, во втором получается и обрабатывается результат обращения. До сих пор нужно делать так:
// Первая часть обработки. void a_some_agent_t::evt_something_happened( event_data_t< SomeInfo > & msg ) { // Какая-то первая часть обработки... ... // Тут пришло время обратиться к другому агенту. m_another_agent_mbox->deliver_message( new SomeRequest(...) ); // На этом обработка заканчивается до ожидания ответа другого агента. } // Вторая часть обработки. void a_some_agent_t::evt_another_agent_reply( event_data_t< SomeReply > & msg ) { // Вторая часть обработки... ... } |
С добавлением описанного выше синхронного взаимодействия можно будет писать так:
void a_some_agent_t::evt_something_happened( event_data_t< SomeInfo > & msg ) { // Какая-то первая часть обработки... ... // Тут пришло время обратиться к другому агенту. std::unique_ptr< SomeReply > r = m_another_agent_svc.sync_request( new SomeRequest(...) ); // Вторая часть обработки... ... } |
Теперь несколько слов о том, как все это может быть реализовано "под капотом". В самом простейшем виде.
Когда агент создает у себя хендл сервиса, то этот хендл автоматически регистрирует в SObjectizer-е анонимный mbox, ссылка на который будет хранится внутри хендла сервиса.
Когда агент связывает сервисные методы с хендлом сервиса в so_define_agent() происходит подписка этого агента на специальные сообщения в заданных состояниях для анонимного mbox-а хендла сервиса.
Когда какой-то агент обращается к сервису происходит отсылка сообщения на mbox хендла сервиса. Это сообщение обычным образом диспетчируется и передается в очередь агента на обработку. Но когда инициируется обработка этого сообщения, происходит несколько больше действий, чем с обычным сообщением:
- вызывается не сервисный метод напрямую, а специальная обертка. Внутри которой вызывается сервисный метод и обрабатывается результат его работы;
- после вызова сервисного метода обертка передает результат его работы в future_t, который связан с вызовом, после чего дает сигнал о том, что сервис обработал.
На стороне пользователя сервиса все самое главное происходит в методе future_t::get(). Там выполняется ожидание сигнала о завершении обработки сервиса или же истечение тайм-аута операции. Тайм-аут нужен для ряда случаев: сервис не был обработан вовсе (предоставляющий сервис агент был дерегистрирован или находился не в том состоянии, очередь заявок к этому агенту была слишком большой) или же произошел деадлок.
Защита от деадлоков -- это отдельный вопрос. В простейшем случае она может быть реализована в виде пары предохранительных мер:
- Проверки того, что пользователь и поставщик сервиса работают на разных нитях (эта проверка может быть сделана в самом начале async_request/sync_requst, т.к. в хендле сервиса хранится идентификатор его рабочей нити). Такая проверка позволит сразу диагностировать деадлоки, возникающие из-за работы на одной нити;
- Тайм-аутов. Истечение времени ожидания ответа может означать наличие более сложного деадлока. Например, агент A обращается к сервису агента B, который обращается к сервису агента C, который обращается к сервису агента A.
Думаю, что для самой первой реализации синхронного взаимодействия этого будет вполне достаточно. Причем работать это будет довольно шустро. В последующих реализациях можно будет перейти к более сложным схемам, например, к хранению для рабочих нитей списков ожидающих их нитей с проверкой наличия циклов в этих списках. Но тут уже нужно будет смотреть на накладные расходы, а так же на актуальность такого точного контроля.
Вот такая черновая схема получилась в общих чертах. Понятно, что дьявол будет скрываться в пока еще не известных деталях. Но, по первым впечатлениям, вариант вполне работоспособный и реализуемый.
Комментариев нет:
Отправить комментарий