Версии SO-5.5.23 и so_5_extra-1.2.0 уже начинают дышать полной грудью. В SO-5.5.23 был реализован новый механизм, который позволяет вкладывать сообщение в некий "конверт". Внутри SObjectizer-а доставка идет уже для всего конверта с вложенным в него сообщением. Сообщение из конверта достается либо когда сообщение доставлено до получателя. Либо когда сообщение по какой-то причине преобразуется из одного представления в другое (например, так происходит в случае использования limit_then_transform). Причем под "доставлено до получателя" означает не только то, что получатель извлек сообщение из очереди, но и то, что у получателя был найден обработчик для этого типа сообщения. Т.е. сообщение реально доставлено, а не просто взято из очереди.
На базе этого механизма в so_5_extra-1.2.0 добавлены средства для реализации как гарантированно отзывных таймеров, так и просто для реализации отзывных сообщений. Для отзывных сообщений, кстати говоря, сразу же придумываются сценарии, в которых их можно использовать. Так, что, наверное, эта штука может быть востребована.
Ну а в качестве примера создания собственных "конвертов" в состав so_5_extra включена самая примитивна реализация такой прикольной штуки, как отчеты о доставке отосланных сообщений до получателя.
Эти самые отчеты о доставке -- это идея фикс, которая витала в воздухе очень и очень давно. Внимание она привлекает потому, что когда агент A отсылает сообщение агенту B, то далеко не факт, что сообщение до агента B вообще дойдет. Например, сообщение может быть отвергнуто механизмом защиты агента от перегрузки (например, limit_then_drop). И временами хотелось бы знать, сообщение до B не дошло или все-таки дошло. Но вот узнать это не представлялось возможным.>
Теперь же можно сделать собственный "конверт", в который будет вкладываться сообщение для B. И если конверт до B дойдет, значит и сообщение дошло. И простейшая реализация такого "конверта" включена в новый пример для so_5_extra. Исходный текст этого примера под катом, интересующиеся могут посмотреть.
Суть примера в том, что есть два агента: requests_generator_t и processor_t.
Агент processor_t работает хитрым образом: он обрабатывает сообщение request_t будучи в состоянии st_normal, но при обработке переходит в состояние st_busy. В этом состоянии агент тупо ничего не делает 2 секунды, затем возвращается в st_normal. Это означает, что если агенту processor_t отослать подряд N сообщений request_t, то только первое из них будет обработано, а остальные будут выброшены. Т.к. агент обработает первое в st_normal, а затем уйдет в st_busy, в котором все последующие сообщения игнорируются.
Агент requests_generator_t раз в 3 секунды перепосылает сообщения request_t пачками. Но не просто request_t, а экземпляр request_t, завернутый внутрь custom_envelope_t. А этот custom_envelope_t отсылает агенту requests_generator_t подтверждение, если сообщение было доставлено до processor_t. Получив подтверждение requests_generator_t забывает про доставленный запрос и в следующий раз отошлет уже пачку меньшего размера. Работа примера завершается тогда, когда requests_generator_t получит подтверждения для всех своих запросов.
Вот как раз custom_envelope_t и стал возможным благодаря нововведениям в SO-5.5.23. На практике от подобного "конверта" вряд ли будет польза. Но для примера открывающихся возможностей самое оно.
Ниже исходный код примера. Если что-то будет непонятно, то ничего страшного. Во-первых, можно задать вопрос. Во-вторых, я сажусь за написание статьи (вероятно для Хабра), в которой вся эта кухня будет расписана подробнее.
#include <so_5_extra/enveloped_msg/just_envelope.hpp> #include <so_5_extra/enveloped_msg/send_functions.hpp> #include <so_5/all.hpp> using namespace std::chrono_literals; namespace envelope_ns = so_5::extra::enveloped_msg; using id_t = int; // Type of request to be processed. struct request_t final { id_t m_id; std::string m_data; }; // Message to be used as delivery receipt for request delivery. struct delivery_receipt_t final { // ID of delivered request. id_t m_id; }; // Agent to process requests. class processor_t final : public so_5::agent_t { // Normal state. Agent accepts new requests in that state. state_t st_normal{this, "normal"}; // Busy state. Agent don't accepts new requests in that state. state_t st_busy{this, "busy"}; public: processor_t(context_t ctx) : so_5::agent_t{std::move(ctx)} { this >>= st_normal; st_normal.event(&processor_t::on_request); // No event handlers for st_busy. // But time for standing in st_busy is limited. st_busy.time_limit(2s, st_normal); } private: void on_request(mhood_t<request_t> cmd) { std::cout << "processor: on_request(" << cmd->m_id << ", " << cmd->m_data << ")" << std::endl; this >>= st_busy; } }; // A custom envelope for sending delivery_receipt. class custom_envelope_t final : public envelope_ns::just_envelope_t { // Destination for delivery receipt. const so_5::mbox_t m_to; // ID of delivered request. const id_t m_id; public: custom_envelope_t( so_5::message_ref_t payload, so_5::mbox_t to, id_t id) : envelope_ns::just_envelope_t{std::move(payload)} , m_to{std::move(to)} , m_id{id} {} void handler_found_hook(handler_invoker_t & invoker) noexcept override { // Send delivery receipt. so_5::send<delivery_receipt_t>(m_to, m_id); // Delegate an actual work to base class. envelope_ns::just_envelope_t::handler_found_hook(invoker); } }; // Agent to issue requests and resend them after some time. class requests_generator_t final : public so_5::agent_t { // Processor's mbox. const so_5::mbox_t m_processor; // Map of requests in flight. std::map<id_t, std::string> m_requests; struct resend_requests final : public so_5::signal_t {}; public: requests_generator_t(context_t ctx, so_5::mbox_t processor) : so_5::agent_t{std::move(ctx)} , m_processor{std::move(processor)} { so_subscribe_self() .event(&requests_generator_t::on_delivery_receipt) .event(&requests_generator_t::on_resend); } void so_evt_start() override { // Create requests to be delivered to processor. m_requests.emplace(0, "First"); m_requests.emplace(1, "Second"); m_requests.emplace(2, "Third"); m_requests.emplace(3, "Four"); // Send requests to processor. send_requests(); } private: void on_delivery_receipt(mhood_t<delivery_receipt_t> cmd) { std::cout << "request delivered: " << cmd->m_id << std::endl; m_requests.erase(cmd->m_id); if(m_requests.empty()) // No more requests. Work can be finished. so_deregister_agent_coop_normally(); } void on_resend(mhood_t<resend_requests>) { std::cout << "time to resend requests, pending requests: " << m_requests.size() << std::endl; send_requests(); } void send_requests() { for(const auto & item : m_requests) { std::cout << "sending request: (" << item.first << ", " << item.second << ")" << std::endl; envelope_ns::make<request_t>(item.first, item.second) .envelope<custom_envelope_t>(so_direct_mbox(), item.first) .send_to(m_processor); } // Send delayed message to resend non-delivered requests later. so_5::send_delayed<resend_requests>(*this, 3s); } }; int main() { so_5::launch([](so_5::environment_t & env) { env.introduce_coop([](so_5::coop_t & coop) { auto processor = coop.make_agent<processor_t>(); coop.make_agent<requests_generator_t>(processor->so_direct_mbox()); }); }); } |
Комментариев нет:
Отправить комментарий