вторник, 16 октября 2018 г.

[prog.c++.sobjectizer] Сбылась мечта идиота: можно сделать отчеты о доставке сообщения до получателя.

Версии SO-5.5.23 и so_5_extra-1.2.0 уже начинают дышать полной грудью. В SO-5.5.23 был реализован новый механизм, который позволяет вкладывать сообщение в некий "конверт". Внутри SObjectizer-а доставка идет уже для всего конверта с вложенным в него сообщением. Сообщение из конверта достается либо когда сообщение доставлено до получателя. Либо когда сообщение по какой-то причине преобразуется из одного представления в другое (например, так происходит в случае использования limit_then_transform). Причем под "доставлено до получателя" означает не только то, что получатель извлек сообщение из очереди, но и то, что у получателя был найден обработчик для этого типа сообщения. Т.е. сообщение реально доставлено, а не просто взято из очереди.

На базе этого механизма в so_5_extra-1.2.0 добавлены средства для реализации как гарантированно отзывных таймеров, так и просто для реализации отзывных сообщений. Для отзывных сообщений, кстати говоря, сразу же придумываются сценарии, в которых их можно использовать. Так, что, наверное, эта штука может быть востребована.

Ну а в качестве примера создания собственных "конвертов" в состав so_5_extra включена самая примитивна реализация такой прикольной штуки, как отчеты о доставке отосланных сообщений до получателя.

Эти самые отчеты о доставке -- это идея фикс, которая витала в воздухе очень и очень давно. Внимание она привлекает потому, что когда агент A отсылает сообщение агенту B, то далеко не факт, что сообщение до агента B вообще дойдет. Например, сообщение может быть отвергнуто механизмом защиты агента от перегрузки (например, limit_then_drop). И временами хотелось бы знать, сообщение до B не дошло или все-таки дошло. Но вот узнать это не представлялось возможным.

Теперь же можно сделать собственный "конверт", в который будет вкладываться сообщение для B. И если конверт до B дойдет, значит и сообщение дошло. И простейшая реализация такого "конверта" включена в новый пример для so_5_extra. Исходный текст этого примера под катом, интересующиеся могут посмотреть.

Суть примера в том, что есть два агента: requests_generator_t и processor_t.

Агент processor_t работает хитрым образом: он обрабатывает сообщение request_t будучи в состоянии st_normal, но при обработке переходит в состояние st_busy. В этом состоянии агент тупо ничего не делает 2 секунды, затем возвращается в st_normal. Это означает, что если агенту processor_t отослать подряд N сообщений request_t, то только первое из них будет обработано, а остальные будут выброшены. Т.к. агент обработает первое в st_normal, а затем уйдет в st_busy, в котором все последующие сообщения игнорируются.

Агент requests_generator_t раз в 3 секунды перепосылает сообщения request_t пачками. Но не просто request_t, а экземпляр request_t, завернутый внутрь custom_envelope_t. А этот custom_envelope_t отсылает агенту requests_generator_t подтверждение, если сообщение было доставлено до processor_t. Получив подтверждение requests_generator_t забывает про доставленный запрос и в следующий раз отошлет уже пачку меньшего размера. Работа примера завершается тогда, когда requests_generator_t получит подтверждения для всех своих запросов.

Вот как раз custom_envelope_t и стал возможным благодаря нововведениям в SO-5.5.23. На практике от подобного "конверта" вряд ли будет польза. Но для примера открывающихся возможностей самое оно.

Ниже исходный код примера. Если что-то будет непонятно, то ничего страшного. Во-первых, можно задать вопрос. Во-вторых, я сажусь за написание статьи (вероятно для Хабра), в которой вся эта кухня будет расписана подробнее.

#include <so_5_extra/enveloped_msg/just_envelope.hpp>
#include <so_5_extra/enveloped_msg/send_functions.hpp>

#include <so_5/all.hpp>

using namespace std::chrono_literals;

namespace envelope_ns = so_5::extra::enveloped_msg;

using id_t = int;

// Type of request to be processed.
struct request_t final
{
   id_t m_id;
   std::string m_data;
};

// Message to be used as delivery receipt for request delivery.
struct delivery_receipt_t final
{
   // ID of delivered request.
   id_t m_id;
};

// Agent to process requests.
class processor_t final : public so_5::agent_t
{
   // Normal state. Agent accepts new requests in that state.
   state_t st_normal{this"normal"};
   // Busy state. Agent don't accepts new requests in that state.
   state_t st_busy{this"busy"};

public:
   processor_t(context_t ctx) : so_5::agent_t{std::move(ctx)}
   {
      this >>= st_normal;

      st_normal.event(&processor_t::on_request);

      // No event handlers for st_busy.
      // But time for standing in st_busy is limited.
      st_busy.time_limit(2s, st_normal);
   }

private:
   void on_request(mhood_t<request_t> cmd)
   {
      std::cout << "processor: on_request(" << cmd->m_id << ", "
            << cmd->m_data << ")" << std::endl;

      this >>= st_busy;
   }
};

// A custom envelope for sending delivery_receipt.
class custom_envelope_t final : public envelope_ns::just_envelope_t
{
   // Destination for delivery receipt.
   const so_5::mbox_t m_to;

   // ID of delivered request.
   const id_t m_id;

public:
   custom_envelope_t(
      so_5::message_ref_t payload,
      so_5::mbox_t to,
      id_t id)
      :  envelope_ns::just_envelope_t{std::move(payload)}
      ,  m_to{std::move(to)}
      ,  m_id{id}
   {}

   void
   handler_found_hook(handler_invoker_t & invoker) noexcept override
   {
      // Send delivery receipt.
      so_5::send<delivery_receipt_t>(m_to, m_id);
      // Delegate an actual work to base class.
      envelope_ns::just_envelope_t::handler_found_hook(invoker);
   }
};

// Agent to issue requests and resend them after some time.
class requests_generator_t final : public so_5::agent_t
{
   // Processor's mbox.
   const so_5::mbox_t m_processor;

   // Map of requests in flight.
   std::map<id_t, std::string> m_requests;

   struct resend_requests final : public so_5::signal_t {};

public:
   requests_generator_t(context_t ctx, so_5::mbox_t processor)
      :  so_5::agent_t{std::move(ctx)}
      ,  m_processor{std::move(processor)}
   {
      so_subscribe_self()
         .event(&requests_generator_t::on_delivery_receipt)
         .event(&requests_generator_t::on_resend);
   }

   void so_evt_start() override
   {
      // Create requests to be delivered to processor.
      m_requests.emplace(0"First");
      m_requests.emplace(1"Second");
      m_requests.emplace(2"Third");
      m_requests.emplace(3"Four");

      // Send requests to processor.
      send_requests();
   }

private:
   void on_delivery_receipt(mhood_t<delivery_receipt_t> cmd)
   {
      std::cout << "request delivered: " << cmd->m_id << std::endl;
      m_requests.erase(cmd->m_id);

      if(m_requests.empty())
         // No more requests. Work can be finished.
         so_deregister_agent_coop_normally();
   }

   void on_resend(mhood_t<resend_requests>)
   {
      std::cout << "time to resend requests, pending requests: "
            << m_requests.size() << std::endl;

      send_requests();
   }

   void send_requests()
   {
      for(const auto & item : m_requests)
      {
         std::cout << "sending request: (" << item.first << ", "
               << item.second << ")" << std::endl;

         envelope_ns::make<request_t>(item.first, item.second)
               .envelope<custom_envelope_t>(so_direct_mbox(), item.first)
               .send_to(m_processor);
      }

      // Send delayed message to resend non-delivered requests later.
      so_5::send_delayed<resend_requests>(*this, 3s);
   }
};

int main()
{
   so_5::launch([](so_5::environment_t & env) {
      env.introduce_coop([](so_5::coop_t & coop) {
         auto processor = coop.make_agent<processor_t>();
         coop.make_agent<requests_generator_t>(processor->so_direct_mbox());
      });
   });
}

Комментариев нет: