понедельник, 20 июля 2015 г.

[prog.c++11] Прогресс в разработке SObjectizer-5.6.0. Часть 1: приоритеты доставки

С момента релиза версии 5.5.5, после чего, собственно и началась работа над версией 5.6.0, прошло чуть меньше двух месяцев. Из этих двух месяцев по тем или иным причинам выпало полторы или две недели. Но остальное время было направленно на работу над SObjectizer (включая, правда, и рассказы о возможностях SO-5 на профильных форумах, что так же оказывается трудоемким процессом). Первоначальных задумок для 5.6.0 было много. Но что из этого имеет шанс быть воплощенным в коде и в каком именно виде? Об этом речь пойдет в небольшой серии постов.


Основной фишкой версии 5.6.0 декларировалась реализация приоритетов доставки сообщений.

SO-5 изначально не поддерживал приоритетов для событий агентов. Все сообщения, которые попали в очередь агента, извлекались и обрабатывались с одинаковым приоритетом. Имхо, это было оправданным решением на первых этапах развития SO-5. Но по мере развития продукта он должен обрастать новыми возможностями, дабы упрощать жизнь пользователям. Приоритеты доставки как раз являются такой возможностью.

Однако оказалось, что встроить приоритеты событий в архитектуру, которая создавалась без учета оных, очень и очень непросто. Несмотря на несколько заходов с разных сторон мне так и не удалось найти способ нормальной интеграции этой фичи в уже имеющуюся функциональность. Слишком много вещей "попадают под раздачу" при попытке добавить приоритеты: разные типы mbox-ов, механизмы подписки, фильтры доставки и типы диспетчеров. Все это, конечно, можно перелопатить, раздув объем кода SObjectizer в несколько раз. Но есть большое опасение, что в итоге получится весьма хрупкая конструкция. Причем это касается как реализации SObjectizer (слишком много частных случаев, которые придется обрабатывать в коде), так и идеологии использования SObjectizer. Последнее хуже всего, т.к. уже неоднократно встречались высказывания о том, что SObjectizer слишком сложен и непривычен при первом знакомстве. Усугубить это еще и не очевидными правилами поведения агентов в ситуациях с приоритетами и без приоритетов крайне не хотелось.

Поэтому сейчас в качестве рабочего варианта рассматривается очень примитивный и, лично для меня неожиданный, подход: приоритет задается не для события агента, а для всего агента целиком.

Т.е. вместо того, чтобы задавать приоритет P1 событию e1, приоритет P2 событию e2 и т.д. для одного и того же агента, пользователь будет задавать приоритет P1 агенту a1 и приоритет P2 агенту a2. Это означает, что все события агента a1 имеют приоритет P1, а все события агента a2 -- приоритет P2.

Приоритетное обслуживание заявок обеспечивают не все диспетчеры. Будет разработан новый диспетчер с одной рабочей нитью под условным названием prio_one_thread. Приоритет задается при привязке агентов к этому диспетчеру. Что-то вроде:

// Диспетчер с поддержкой приоритетов.
auto prio_disp = so_5::disp::prio_one_thread::create_private_disp(env);
// Создание агентов с привязкой к диспетчеру и назначением приоритета.
coop->make_agent_with_binder< first_agent >(
      // Нулевой приоритет.
      prio_disp->binder( so_5::prio::p0 ), ... );
coop->make_agent_with_binder< second_agent >(
      // Более высокий приоритет.
      prio_disp->binder( so_5::prio::p1 ), ... );

В этом случае для агента вообще ничего не меняется. Вся его внутренняя кухня (подписка, фильтры доставки и т.д.) остается точно такой же, как и прежде. Просто агенту выделяется очередь сообщений у которой есть свой приоритет внутри диспетчера. Соответственно, prio_one_thread-диспетчер владеет N очередями, по одной на каждый приоритет. И обрабатывает заявки начиная с очереди с самым высоким приоритетом. Когда эта очередь опустошается, диспетчер переходит к обслуживанию заявок из очереди с более низким приоритетом. И т.д. Если при обработке очереди с приоритетом Pi обнаруживается, что появилась заявка в очереди с более высоким приоритетом (например, P(i+1)), то обработка очереди Pi приостанавливается и стартует обработка очереди P(i+1).


Такой подход позволяет решать различные задачи.

Скажем, если есть сообщение change_params, которое изменяет параметры обработки остальных сообщений и на это сообщение нужно реагировать как можно быстрее, а после его получения обрабатывать поток сообщений уже с новыми параметрами.

Такого рода сообщение change_params может появиться, например, в результате обнаружения перегрузки агента (т.е. когда в очереди агента оказывается больше сообщений, чем он в состоянии нормально обработать). В этом случае параметры обработки могут быть изменены так, чтобы вместо полноценного обслуживания заявок сразу генерировался отрицательный ответ о том, что система перегружена. Т.е. сообщение change_params, имеющее более высокий приоритет, может быть частью механизма overload control.

Решается эта задача за счет двух агентов: первый, с приоритетом P1, обрабатывает сообщение change_params и изменяет параметры обработки остального потока сообщений. И второй, с приоритетом P0, который обрабатывает все остальное. Оба агента привязаны к одному и тому же экземпляру диспетчера prio_one_thread.

Еще один сценарий: обработка разных потоков сообщений с разным приоритетом. Например, есть сообщения timeout_expired, action_result и retry, которые должны обрабатываться с более высоким приоритетом, чем сообщения new_request, tell_status. И есть еще поток сообщений remove_too_old, которые нужно обрабатывать с самым низким приоритетом.

Здесь потребуются три агента. Первый, с приоритетом P2, обрабатывает timeout_expired, action_resul и retry. Второй, с приоритетом P1, обрабатывает new_request и tell_status. А третий, с приоритетом P0, -- сообщение remove_too_old. Все три агента привязываются к одному и тому же экземпляру диспетчера prio_one_thread.


Назначение приоритетов на уровне агента целиком означает, что если нужно выстроить обработку потока сообщений с разными приоритетами, то обработка будет разделена между несколькими агентами. А агенты эти, скорее всего, будут нуждаться в общих данных. Вероятнее всего общих мутабельных данных. Соответственно, возникает вопрос, как быть с защитой этих данных при доступе к ним из разных агентов.

В принципе, ничего с этим делать не нужно :)

Приоритетная обработка имеет смысл, если все агенты привязаны к одному и тому же экземпляру prio_one_thread-диспетчера. А если так, то сам диспетчер гарантирует, что работать будет всего один агент. Следовательно, если какой-то блок данных разделен между несколькими агентами, то конкурентного доступа к этим данным просто не будет.

При таком подходе разве что увеличится объем кода. Если раньше всю обработку делал один агент и у него внутри могло быть что-то вроде:

class request_processor : public so_5::rt::agent_t
{
   // Параметры обработки.
   processing_params_t m_params;
   // Текущий список запросов, находящихся в обработке.
   active_request_list_t m_active_requests;
   ...
};

То теперь m_params и m_active_requests может потребоваться разделить между несколькими агентами. Что можно сделать, скажем, через shared_ptr:

struct common_data
{
   // Параметры обработки.
   processing_params_t m_params;
   // Текущий список запросов, находящихся в обработке.
   active_request_list_t m_active_requests;
   ...
};

class params_manager : public so_5::rt::agent_t
{
   std::shared_ptr< common_data > m_data;
   ...
};

class request_processor : public so_5::rt::agent_t
{
   std::shared_ptr< common_data > m_data;
   ...
};

// Общие данные для агентов.
auto data = std::make_shared< common_data >();
// Создание агентов и назначение им разных приоритетов.
auto prio_disp = so_5::disp::prio_one_thread::create_private_disp( env );
coop->make_agent_with_binder< params_manager >(
      prio_disp->binder( so_5::prio::p1 ),
      data );
coop->make_agent_with_binder< request_processor >(
      prio_disp->binder( so_5::prio::p0 ),
      data );

Или же через механизм take_under_control:

struct common_data
{
   // Параметры обработки.
   processing_params_t m_params;
   // Текущий список запросов, находящихся в обработке.
   active_request_list_t m_active_requests;
   ...
};

class params_manager : public so_5::rt::agent_t
{
   common_data & m_data;
   ...
};

class request_processor : public so_5::rt::agent_t
{
   common_data & m_data;
   ...
};

// Общие данные для агентов.
auto & data = *(coop->take_under_control( std::make_unique< common_data >() ));
// Создание агентов и назначение им разных приоритетов.
auto prio_disp = so_5::disp::prio_one_thread::create_private_disp( env );
coop->make_agent_with_binder< params_manager >(
      prio_disp->binder( so_5::prio::p1 ),
      data );
coop->make_agent_with_binder< request_processor >(
      prio_disp->binder( so_5::prio::p0 ),
      data );

Как уже было сказано, приоритеты будет поддерживать только один диспетчер под условным названием prio_one_thread. Все уже существующие диспетчеры приоритетного обслуживания заявок не поддерживают и не будут поддерживать.

Во-первых, приоритетная обработка заявок, исходя из прошлого опыта, является весьма экзотическим случаем. Поэтому не хочется терять производительность диспетчеров, которые используются чаще всего.

Во-вторых, приоритетное обслуживание на таких диспетчерах как thread_pool и adv_thread_pool вообще лишено смысла. Действительно, на пуле потоков весьма сложно обеспечить гарантию старта заявки с приоритетом P1 раньше заявки с приоритетом P0. Зато такие гарантии сами собой появляются у диспетчера с одной рабочей нитью. Поэтому-то приоритетное обслуживание будет реализовывать только prio_one_thread. Тем более, что механизмы приватных диспетчеров, а так же add_dispatcher_if_not_exists, позволяют выстраивать самые причудливые схемы распределения прикладных агентов по диспетчерам.


Все вышесказанное является текущим рабочим приближением, которое выглядит довольно осмысленно и реализуемо. Однако, в граните пока ничего не отлито. Поэтому, если у читателей возникнут замечения/предложения или вообще совершенно альтернативные варианты, то сейчас самое время их услышать. Потом будет поздно ;)

Отправить комментарий