Добавленный в версии 5.5.13 механизм mchain-ов может использоваться не только для организации взаимодействия между SObjectizer-овскими и не-SObjectizer-овскими частями приложения. Но и для защиты агентов от перегрузки. В версии 5.5.14 добавлен новый пример producer_consumer_mchain, который демонстрирует эту возможность. Под катом несколько слов о том, как это работает.
Суть проблемы producer-consumer в том, что producer может генерировать больше сообщений для consumer-а, чем consumer в состоянии обработать за тоже самое время. Так, если producer генерирует 150 msg/sec, а consumer обрабатывает всего 100 msg/sec, то каждую секунду в очереди consumer-а будет накапливаться по 50 необработанных сообщений. Если очередь к consumer-у никак не защищена, то она будет пухнуть до тех пор, пока остается доступная приложению память. При этом производительность приложения будет деградировать вплоть до совсем уж непотребных тормозов, особенно когда начнется активный своппинг.
Поэтому consumer-ы должны быть защищены от перегрузки каким-то механизмом. Можно, например, выбрасывать "лишние" сообщения, которые consumer обработать не в состоянии. Скажем, самые старые из тех, что уже стоят в очереди consumer-а. Или, напротив, самые новые, для которых места в очереди нет. Можно притормозить producer-а: как только очередь consumer-а наполняется, producer приостанавливается на попытке добавить новое сообщение в очередь пока там не появится свободное место. Можно сочетать несколько из этих подходов, скажем, приостанавливать producer-а в случае полной очереди, но не более чем на 0.5s, после чего выбрасывать самое старое сообщение из очереди.
Изначально SObjectizer-5 не содержал никаких механизмов защиты агентов от перегрузки. Задача создания таких механизмов лежала на плечах прикладного разработчика (рекомендации о том, как это делать с использованием подхода collector-performer несколько раз описывались в данном блоге: #1, #2, #3, #4).
Затем в SO-5.5 были добавлены лимиты для сообщений, которые с минимальными усилиями позволяли защищать агентов от перегрузки самым примитивным способом: выбрасыванием новых сообщений или их перенаправлением на других consumer-ов.
И вот теперь сделан следующий шаг в предоставлении пользователям SObjectizer готовых инструментов для организации механизмов overload control. Это ограниченные по размеру mchain-ы, которые могут приостанавливать отправителя сообщения при попытке добавить сообщение в полный mchain. А так же могут выбрасывать либо самое старое сообщение, либо самое новое. Более того, для отправителя сообщения mchain может выглядеть как совершенно обычный mbox, так что producer может просто не знать, что он отсылает сообщения в mchain.
Новый пример producer_consumer_mchain как раз демонстрирует взаимодействие нескольких producer-ов с единственным consumer-ом через mchain фиксированного размера. При попытке отослать сообщение в заполненный mchain producer будет приостановлен на вызове send. Если за время ожидания в mchain-е появляется свободное место, то сообщение будет помещено в mchain. Если нет -- то будет выброшено исключение, которое дает producer-у знать, что его сообщение до consumer-а доставлено не будет.
Агент-consumer обрабатывает содержимое mchain-а пачками, не более 5 штук за раз. Если после обработки очередной пачки mchain не опустел, то consumer инициирует обработку очередной пачки. А вот если mchain становится пустым, то consumer ждет нотификации о поступлении первого сообщения в пустой mchain. Эту нотификацию он отсылает себе сам, для чего вешает на mchain специальный нотификатор. Т.е. consumer не пытается висеть на receive на пустом mchain-е и тем самым блокирует контекст своей рабочей нити. Вместо этого он сам себе отсылает сигнал когда mchain перестает быть пустым, а затем обрабатывает содержимое mchain-а при получении этого сигнала.
В коде все это выглядит приблизительно вот так:
class consumer final : public so_5::agent_t { // Этот сигнал consumer будет отсылать сам себе когда // его mchain перестает быть пустым. При получении данного // сигнала содержимое mchain-а обрабатывается порциями по // несколько штук. struct chain_has_requests : public so_5::signal_t {}; public : consumer( context_t ctx, so_5::mbox_t logger_mbox ) : so_5::agent_t{ ctx // Могут возникать ситуации, когда receive извлекает // последнее сообщение из mchain-а и тут же кто-то помещает // новое сообщение в уже пустой mchain. При этом генерируется // сигнал chain_has_requests. Но receive извлекает это сообщение, // mchain опять становится пустым и тут же кто-то помещает // еще одно сообщение в уже пустой mchain. С генерацией // очередного chain_has_requests. Для того, чтобы такие // chain_has_requests не скапливались в очереди агента consumer-а // их количество жестко ограничивается и лишние сигналы будут // просто игнорироваться. + limit_then_drop< chain_has_requests >(1) } , m_logger_mbox{ std::move(logger_mbox) } { // Создание mchain-а, из которого агент consumer будет брать // сообщения-запросы для обработки. m_chain = so_environment().create_mchain( so_5::make_limited_with_waiting_mchain_params( // Ограничиваем количество сообщений в mchain-е. 10, // mchain будет использовать заранее выделенный // буфер фиксированного размера для хранения заявок. so_5::mchain_props::memory_usage_t::preallocated, // Если свободное место в буфере не появилось даже после // ожидания, то send будет бросать исключение. so_5::mchain_props::overflow_reaction_t::throw_exception, // При попытке добавить сообщение в полный mchain можно // приостановить отправителя на это время. std::chrono::milliseconds(150) ) // Для mchain-а нужен нотификатор, который будет отсылать // агенту consumer-у сигнал о том, что mchain не пуст и можно // обработать несколько запросов. .not_empty_notificator( [this] { so_5::send< chain_has_requests >( *this ); } ) ); // Сразу же подписываем consumer-а на сигнал о наличии ожидающих // обработки запросов. Подписку можно делать где угодно, не обязательно // переопределять so_define_agent() для таких простых случаев. so_subscribe_self().event< chain_has_requests >( &consumer::process_requests ); } // Агент consumer владеет mchain-ом и наружу он его показывает как // обычный mbox. so_5::mbox_t consumer_mbox() const { return m_chain->as_mbox(); } private : const so_5::mbox_t m_logger_mbox; so_5::mchain_t m_chain; // Обработчик очередной порции запросов. // Вызывается как реакция на сигнал chain_has_requests. void process_requests() { // Функция receive инициирует обработку пачки запросов из mchain-а. auto r = receive( // Обрабатываем не более 5 запросов за раз. // Выходим из receive если mchain оказывается пустым. from( m_chain ).handle_n( 5 ).no_wait_on_empty(), []( const request & req ) { // Просто имитируем какую-то обработку для того, чтобы // не возвращаться из обработчика сразу же. std::this_thread::sleep_for( random_pause() ); // Отсылаем ответ на запрос отправителю. so_5::send< reply >( req.m_who, req.m_payload + "#handled" ); } ); m_logger_mbox <<= msg_maker() << "=== " << r.handled() << " request(s) handled"; if( !m_chain->empty() ) // После обработки пачки запросов mchain все еще не пуст. // Поэтому шлем себе очередной chain_has_requests для того, // чтобы обработчик process_requests был вызван еще раз. so_5::send< chain_has_requests >( *this ); } static std::chrono::milliseconds random_pause() { std::random_device rd; std::mt19937 gen{ rd() }; return std::chrono::milliseconds( std::uniform_int_distribution< unsigned int >{2u, 25u}(gen) ); } }; |
Комментариев нет:
Отправить комментарий