понедельник, 18 декабря 2017 г.

[prog.c++] Еще одна попытка упростить работу с отложенными сообщениями в SO-5

Прошу прощения у тех читателей блога, которым не интересно читать посты про SObjectizer. Но данный пост будет продолжением темы про отмену отложенных сообщений, начатую на прошлой неделе. Тогдашнее обсуждение показало, что предлагаемый способ вряд ли можно считать хорошим, к тому же он не решал определенных потребностей отдельных пользователей SO-5. Поэтому данная тема продолжила занимать мое внимание и вот очередная итерация. Кому интересно понаблюдать за эволюцией решений, которые со временем [не]попадают в SObjectizer, милости прошу под кат. Для всех остальных там ничего интересного не будет, сорри. Блог так устроен, что я пишу сюда о том, что занимает меня в конкретный момент времени...

Итак, вкратце о том, какие две существующие на практике задачки/проблемки хочется решить.

Первая проблема в том, что отмена отложенного сообщения через вызов timer_id_t::release() вовсе не гарантирует того, что сообщение не будет доставлено агенту. Если сообщение уже покинуло нить таймера и оказалось в очереди сообщений агента, то оно до агента все-таки дойдет. Даже не смотря на то, что агент уже не хочет сообщение получать. Поэтому приходится снабжать отложенные сообщения идентификаторами и при поступлении отложенного сообщения проверять актуальность содержащегося в сообщении индентификатора.

Вторая задача состоит в том, чтобы упростить прикладному обработчику ограниченного по времени ожидания какого-либо сообщения. Например, агент хочет ждать сообщения A не дольше 300ms. И если сообщение A не пришло, то агент хочет получить сообщение B. Но если сообщение A таки пришло, то сообщения B агент видеть не хочет вообще. Вне зависимости от того, успело оно попасть в очередь или нет.

Думается, что есть подход, который позволяет достаточно понятным для пользователя SObjectizer-а способом закрыть обе эти задачи.

Идея состоит в том, чтобы создать отдельный mbox, в который должно прийти ожидаемое агентом сообщение. И в этот же mbox будет приходить сообщение об истечении тайм-аута, если ожидаемое сообщение так и не поступило. Соответственно, агент подписывается на данный mbox. В общем-то это и все. В коде работа с таким mbox-ом может выглядеть следующим образом:

class demo : public so_5::agent_t {
   // Здесь мы будем хранить специальный mbox для операции get_status.
   so_5::extra::mboxes::time_window::handle_t<> status_op_mbox_;
   ...
   void initiate_get_status() {
      // Нам нужно запросить статус у другого агента.
      // В ответ мы ожидаем сообщения current_status, но если
      // ответ не пришел в течении 300ms, то мы хотим получить
      // сообщение status_timedout.

      // Для этого создаем специальный mbox и подписываемся на него.
      status_op_mbox_ = so_5::extra::mboxes::time_window::make_mbox<>(*this);
      so_subscribe(status_op_mbox_.as_mbox())
            .event(demo::on_current_status)
            .event(demo::on_status_timedout);

      // Отсылаем запрос статуса стороннему агенту и в качестве обратного
      // адреса указываем наш специальный mbox.
      so_5::send<get_status>(coworker_agent_, status_op_mbox_.as_mbox(), ...);

      // И активируем ожидание на нашем специальном mbox-е.
      // Дополнительные параметры, которые указываются в activate будут
      // переданы в конструктор экземпляра status_timedout.
      status_op_mbox_.activate<status_timeout>(300ms, ...);
   }

   // Обработчик ответного сообщения с текущим статусом.
   void on_current_status(mhood_t<current_status> cmd) {
      ... // Какая-то прикладная обработка.
   }

   // Обработчик тайм-аута операции get_status.
   void on_status_timedout(mhood_t<status_timedout> cmd) {
      ... // Обработка тайм-аута.
   }
};

Если в процессе ожидания агент решает, что ждать он больше не хочет, то он просто вызывает метод cancel():

// Какой-то другой ответ от агента, статус которого мы хотим получить.
// Этот ответ дает нам понять, что агент жив-здоров и нет смысла
// ждать от него current_stats.
void on_coworker_restarted(mhood_t<restared_notify> cmd) {
   // Т.к. нет смысла ждать статуса, то просто отменяем эту операцию.
   status_op_mbox_.cancel();
   ...
}

При этом, если был вызван cancel(), то к агенту ни одно сообщение, которое он ждал из специального mbox-а, не дойдет. Даже если сами сообщения уже стоят у него в очереди. Не дойдут потому, что cancel() отменит все подписки агента на сообщения из mbox-а.

Как по мне, так этот подход позволяет обрабатывать разные сценарии. Начиная от самых простых. Скажем, простое отменяемое отложенное сообщение: создаем time_window::handle_t, подписываемся на отложенное сообщение из этого mbox-а, затем вызваем activate(). Т.е. делаем практически то же самое, что и с send_delayed/send_periodic. Когда определяем, что тайм-аут нам больше не нужен, то просто вызываем cancel(). Подписка автоматически уничтожается, отложенное сообщение до нас не дойдет.

Но возможны и более сложные сценарии. Например, мы можем ждать не сообщение одного типа, а нескольких. Скажем, сообщения current_status, restarted_notify и worker_overloaded. И обрабатывать эти сообщения в разных состояниях. Все это делается обычным образом -- путем подписок соответствующих обработчиков агента в соответствующих состояниях.

То, что какие-то операции могут занимать много места в коде, можно будет спрятать посредством вспомогательных функций. Например, если нам не нужно отменять отложенную операцию, а обработчики всегда подписываются в одном и том же состоянии, то можно воспользоваться чем-то вроде:

auto reply_mbox = so_5::extra::mboxes::time_window::initiate_wait<status_timeout>(
      // Агент, для которого все это делается.
      *this,
      // Обработчики, которые должны быть подписаны.
      &demo::on_current_status,
      &demo::on_status_timeout,
      // Сколько ждать ответа.
      300ms, 
      // Все остальное пойдет в конструктор status_timeout.
      ... );
// Отсылаем запрос статуса стороннему агенту и в качестве обратного
// адреса указываем наш специальный mbox.
so_5::send<get_status>(coworker_agent_, reply_mbox, ...);

В общем, на первый взгляд выглядит вполне как искомое решение. Но критику, вопросы и замечания все равно хотелось бы услышать.

Если же общее впечатление останется положительным, то тогда можно будет тщательно подумать о том, как же это все реализовать. Ибо пока еще нет понимания того, как некоторые вещи для этих mbox-ов сделать, чтобы никакого мусора в процессе работы не накапливалось и память не текла.

Отправить комментарий