Чем больше думаешь о таких вещах, как защита агентов от перегрузки и обеспечение дедлайнов обработки сообщений, тем больше убеждаешься в том, что разделение на агента-collector-а и агента-performer-а -- это действительно очень простой и действенный путь к нужному результату. Фактически, паттерн для SObjectizer.
Суть этого приема в том, что для выполнения какой-то прикладной работы создаются два агента, работающих на разных контекстах.
Первый -- это агент-collector. Он принимает запросы на выполнение работы (сообщение receive_job) и сохраняет их в своей внутренней очереди. Данная очередь может иметь ограниченный объем, тем самым обеспечивается защита от перегрузки (т.е. лишние сообщения выбрасываются). При сохранении в очередь можно выполнять выбрасывание дубликатов (например, инициатор receive_job слишком быстро повторяет сообщения в отсутствии ответов). Так же очередь может быть приоритетной с учетом различных параметров получаемых receive_job-сообщений (сообщения с наиболее близким дедлайном могут иметь больший приоритет). Агент-collector может периодически чистить свою очередь, выбрасывая те сообщения, дедлайн для которых уже истек (тем самым обеспечивается учет времени жизни сообщения).
Второй агент в этой паре -- это агент-performer. Он выполняет запросы. Например, этот агент может выполнять длительные операции с БД, осуществлять обращения к внешним системам, время отклика которых может серьезно варьироваться, может осуществлять взаимодействие с оборудованием (SmartCard-ридерами, HSM-ами, подключенными через внешние порты контроллерами/датчиками и прочей периферией) и т.д. и т.п. Важно то, что время обработки одного запроса у performer-а гораздо выше, чем у collector-а. И это время может оказываться в довольно широком диапазоне -- от миллисекунд до десятков секунд.
Выполнив очередной запрос агент-performer запрашивает у collector-а следующий посредством отсылки collector-у сообщения select_next_job. В ответ collector отсылает performer-у следующий запрос для обработки в виде сообщения perform_job. Обработав perform_job агент-performer снова отсылает select_next_job агенту-collector-у, получает в ответ новый perform_job и т.д.
Ключевой момент здесь -- это работа collector-а и performer-а на разных контекстах. Т.к. длительность операций performer-а может в десятки раз превышать длительность операций collector-а, эти агенты обязательно должны быть привязаны к разным рабочим нитям. Что особенно важно в случаях, когда performer должен выполнять синхронные операции, например, работать с внешним оборудованием вроде SmartCard-ридеров и HSM-ами через сторонние библиотеки.
К счастью, привязать агентов к разным контекстам в SObjectizer проще простого, даже если эти агенты входят в одну кооперацию. Достаточно при добавлении агентов к кооперации указать, к какому диспетчеру они должны быть привязаны. Например, пусть агенты-collector-ы работают на диспетчере с одной общей рабочей нитью под названием "collectors", а агенты-performer-ы на диспетчере с пулом рабочих потоков под названием "performers":
// Добавляем к кооперации агента-collector-а. coop->add_agent( std::move( collector ), // Привязываем его к своему диспетчеру. so_5::disp::one_thread::create_disp_binder( "collectors" ) ); // Добавляем агента-performer-а. coop->add_agent( std::move( performer ), // Привязываем его к своему диспетчеру. so_5::disp::thread_pool::create_disp_binder( "performers", // При этом используем дефолтные параметры для работы // агента на этом диспетчере. so_5::disp::thread_pool::params_t{} ) ); |
Интересную возможность дает в этом плане диспетчер adv_thread_pool. Если объявить событие агента-performer-а для обработки сообщения perform_job как thread-safe, то это разрешает adv_thread_pool-диспетчеру одновременно запускать несколько таких событий на своих рабочих нитях. Т.е. если реализовать обработку perform_job так, чтобы не модифицировать в этот момент внутренности агента-performer-а, то можно обеспечить балансировку нагрузки на несколько рабочих нитей.
Работать это будет следующим образом:
- агент-performer получив perform_job сразу же отсылает агенту-collector-у новое сообщение select_next_job и начинает обработку полученного запроса. Текущая рабочая нить может быть занята на неопределенное время;
- агент-collector в ответ на select_next_job отсылает агенту-peformer-у следующее сообщение perform_job;
- диспетчер adv_thread_pool видит, что агент-performer уже работает на одной нити. Но, т.к. обработчик perform_job является thread_safe, то можно запустить еще один обработчик на другой рабочей нити;
- стартует еще один обработчик агента-performer-а на второй рабочей нити диспетчера. В начале он отсылает select_next_job агенту-collector-у, после чего начинает свою основную работу;
- агент-collector в ответ отсылает агенту-performer-у следующее сообщение perform_job и т.д.;
Когда у adv_thread_pool-диспетчера закончатся свободные рабочие нити, последнее отосланное сообщение perform_job застрянет в очереди диспетчера, откуда оно будет извлечено как только освободится какая-нибудь из рабочих нитей диспетчера.
Это примитивная схема, не лишенная недостатков. В частности, для того сообщения, которое застрянет в очереди adv_thread_pool-диспетчера, уже не получится проконтролировать дедлайн. Что, в принципе, можно обойти тем или иным способом. Зато эта схема не требует от программиста изобретения своих собственных велосипедов и позволяет использовать то, что уже есть в SObjectizer-е.
Открытым остается вопрос о том, нужно ли в состав SObjectizer-а включать какие-то заготовки-полуфабрикаты для агентов-collector-ов и performer-ов. На мой взгляд, преждевременно. Вот если накопится несколько таких реализаций в реальных проектах и можно будет проанализировать их общность, тогда к этому вопросу можно будет вернуться.
Комментариев нет:
Отправить комментарий