Подготовка к релизу SO-5.5.19 вышла на финишную прямую. Есть надежда, что за неделю завершим подготовку документации и сопроводительных материалов, и после майских праздников выкатим официальный релиз. Пока же попробую на примитивном примере показать две основные фичи версии 5.5.19: способность SObjectizer-а работать только на одной рабочей нити и распространение мутабельных сообщений. Более того, в этом примере будет использоваться такой вариант SObjectizer Environment, который использует asio-шный event-loop в качестве бэк-энда. Подробнее обо всем этом ниже в посте.
Пример представляет из себя примитивнейший http-сервер, который при выполнении запроса к URL '/' генерирует простенькую html-страничку. Но генерирует с задержкой, которая постепенно увеличивается. При этом, не смотря на то, что обработка отдельного http-запроса "тормозит", http-сервер спокойно занимается приемом и обработкой параллельных запросов.
В качестве http-сервера используется наш restinio, который мы как раз и создавали для того, чтобы иметь возможность легко и непринужденно делать асинхронную обработку http-запросов. Так, благодаря архитектуре restinio мы имеем возможность делегировать обработку запроса SObjectizer-овскому агенту (хотя сам restinio к SObjectizer вообще не привязан и может использоваться отдельно). В примере задействована находящаяся в активной разработке версия restinio-0.2. В публичный ее репозиторий актуальные изменения мы пока еще не влили, вероятно сделаем это ближе к дате релиза SO-5.5.19.
Основная фишка данного примера в том, что в нем все работает на одной общей нити: и asio, и restinio, и SObjectizer. Т.е. SObjectizer не создает никакой дополнительной инфраструктуры, для диспетчеризации событий агентов и таймеров используется asio и евоный event-loop.
Вводных слов достаточно, давайте рассмотрим код примера. Сначала в виде небольших кусочков с пояснениями, а затем уже весь листинг целиком.
Итак, сначала подключаем необходимые заголовочные файлы:
#include <so_5_extra/env_infrastructures/asio/simple_not_mtsafe.hpp> #include <so_5/all.hpp> #include <restinio/all.hpp> |
С подключением all.hpp из so_5 и restinio все должно быть понятно -- это публичные API SObjectizer-а и restinio соответственно. А вот подключение simple_not_mtsafe.hpp из какого-то so_5_extra нужно рассмотреть отдельно.
Во-первых, для того, чтобы SObjectizer смог работать как в многопоточном, так и в однопоточном режиме, в SObjectizer было введено специальное понятие environment infrastructure (или env_infrastructure). Т.е. это инфраструктура, которая требуется для того, чтобы SObjectizer мог выполнять свою работу. В состав SO-5.5.19 включено несколько штатных реализаций env_infrastructure: обычная многопоточная, однопоточная с обеспечением thread-safety и однопоточная без обеспечения этой самой thread-safety. Однако, поскольку пользователям SObjectizer этого может оказаться мало и захочется что-то с перламутровыми пуг специфическое, то нам показалось неразумно тянуть в ядро SObjectizer все подряд. Вместо этого мы начали создание дополнительного проекта so_5_extra, в который, как мы надеемся, будут помещаться дополнительные инструменты и расширения для SObjectizer.
На данный момент в so_5_extra есть всего одна штука -- это как раз реализация env_infrastructure на базе asio-шного event-loop-а. Именно эта реализация и подключается в первом #include.
Примечание. Пока что so_5_extra разрабатывается в приватном репозитории и мы откроем его код когда решим текущие вопросы с выбором подходящего типа лицензии для кода so_5_extra.
Каждый принятый http-запрос будет делегироваться для обработки отдельному SObjectizer-овскому агенту под названием request_handler. Логика работы у него простая: сперва он получает новый запрос, но не обрабатывает сразу, а откладывает обработку на некоторое время. Когда это время истекает, агент request_handler генерирует ответную html-страничку. В коде это выглядит вот так (не показаны только определения сообщений дабы не отвлекать внимания от основной логики):
class request_handler final : public so_5::agent_t { public : request_handler(context_t ctx) : so_5::agent_t(std::move(ctx)) { // В конструктора сразу же подписываем события агента. // Он получает только два сообщения из своего собственного mbox-а. so_subscribe_self() .event(&request_handler::on_accept_request) .event(&request_handler::on_process_request); } private : // Текущий размер паузы перед генерацией ответа на запрос. std::chrono::milliseconds current_pause_{ 25 }; // На столько будет увеличиваться пауза при получении очередного запроса. static constexpr std::chrono::milliseconds delta{ 17 }; // Реакция на поступление нового запроса. // Запрос получается как мутабельное сообщение для того, чтобы // переданное в сообщении содержимое можно было изъять и передать // через move в новое сообщение. void on_accept_request(mutable_mhood_t<accept_request> cmd) { // Имитатация задержки выполняется через простое отложенное // сообщение, которое агент отсылает сам себе. // В это отложенное сообщение мы перемещаем оригинальный запрос. so_5::send_delayed<so_5::mutable_msg<process_request>>(*this, current_pause_, std::move(cmd->req_), current_pause_); // Пауза должна быть увеличена, при этом она не должна расти бесконечно. current_pause_ = (current_pause_ + delta) % 3791; } // Пришло время все-таки обработать запрос. // Поскольку отложенное сообщение отсылалось как мутабельное, // то и обрабатываться оно должно как мутабельное. void on_process_request(mutable_mhood_t<process_request> cmd) { // Просто генерируем маленькую страничку в ответ с установкой // нескольких http-заголовков в ответе. cmd->req_->create_response() .append_header( "Server", "RESTinio_0_2-SO_5_5_19-asio_1_11_0" ) .append_header_date_field() .append_header( "Content-Type", "text/html; charset=utf-8" ) .set_body( fmt::format( "<html>" "<head><title>Example</title></head>" "<body>" "Request processing was delayed by {}ms." "</body>" "</html>", cmd->pause_.count())) .done(); // Все, при вызове done ответ пойдет ожидающему клиенту. } }; |
Но агент request_handler -- это уже обработчик запросов, которые кто-то должен принять и передать агенту request_handler-у. Давайте посмотрим, как это происходит.
Нам потребуется http_server_t из restinio, но restinio::http_server_t -- это шаблонный класс, которому в качестве параметра шаблона нужно указать некий traits. Этот traits будет определять основные параметры работы http_server-а. Определим этот traits так:
using http_server_traits = restinio::single_thread_traits_t< restinio::asio_timer_factory_t, restinio::single_threaded_ostream_logger_t >; |
Т.е. мы говорим, что нам потребуется однопоточный вариант http-сервера (restinio::single_threaded_traits_t), который для отслеживания тайм-аутов будет использовать механизм таймеров asio (restinio::asio_timer_factory_t). Кроме того, мы же хотим видеть какие-то следы обработки запросов нашим сервером. Для этого мы говорим, что http-сервер должен использовать примитивный логгер без защиты от многопоточности (restinio::single_threaded_ostream_logger_t). Этот логгер будет выплевывать сообщения прямо на стандартный поток вывода.
После того, как мы определили http_server_traits нам нужен экземпляр restinio::http_server_t<http_server_traits> у которого кто-то должен вызывать метод open() для старта работы сервера и метод close() для остановки сервера. Логичным видится запихивание этого дела в отдельного агента http_server, который выглядит приблизительно вот так:
class http_server final : public so_5::agent_t { restinio::http_server_t<http_server_traits> server_; public: http_server( context_t ctx, asio::io_service & io_svc, so_5::mbox_t handler_mbox) : so_5::agent_t(std::move(ctx)) , server_( restinio::use_existing_io_service(io_svc), make_server_settings(std::move(handler_mbox))) {} virtual void so_evt_start() override { server_.open(); } virtual void so_evt_finish() override { server_.close(); } }; |
Т.е. у агента http_server есть атрибут server_ типа restinio::http_server_t<http_server_traits>. Когда агент http_server начинает работать внутри SObjectizer-а, он вызывает sever_.open(). Когда завершает -- вызывает server_.close(). В принципе, все просто.
Но внутри http_server есть один вспомогательный метод, make_server_settings, который формирует набор конфигурационных параметров для http-сервера. Вот код этого метода:
static auto make_server_settings(so_5::mbox_t handler_mbox) { restinio::server_settings_t<http_server_traits> settings; // На этом адресе сервер будет слушать входящие подключения. settings.address("127.0.0.1"); // Назначаем обработчик запросов. settings.request_handler( // Данная лямбда будет вызываться каждый раз, как http-сервер распарсит // очередной запрос от очередного клиента. [handler_mbox](restinio::request_handle_t req) { // Воспринимаем только запросы на '/'. if("/" == req->header().request_target()) { // Делегируем запрос агенту request_handler. so_5::send<so_5::mutable_msg<request_handler::accept_request>>( handler_mbox, std::move(req)); // Этим мы говорим http-серверу, что запрос принят и нужно // ждать, пока будет сгенерирован ответ на запрос. return restinio::request_accepted(); } else // Все остальные запросы мы отказываемся принимать, поэтому // http-сервер сам отдаст клиенту ответ "not implemented". return restinio::request_rejected(); }); return settings; } |
Ну вот, собственно, почти и все. Осталось только запустить SObjectizer с двумя агентами внутри. Один из них стартует http-сервер и начнет принимать запросы, а второй будет эти запросы обрабатывать:
void run() { // Asio на котором все будет работать. asio::io_service io_svc; // Запускаем SO-5. so_5::launch( [&](so_5::environment_t & env) { // Начальные действия для SObjectizer заключаются // в создании единственной кооперации с двумя агентами внутри. env.introduce_coop([&](so_5::coop_t & coop) { auto handler = coop.make_agent<request_handler>(); coop.make_agent<http_server>( std::ref(io_svc), handler->so_direct_mbox()); }); }, [&](so_5::environment_params_t & params) { // SO-5 нужно должным образом настроить. Он должен использовать // специализированный env_infrastructure, благодаря которому все // события будут обрабатываться через asio-шный event-loop. namespace asio_env = so_5::extra::env_infrastructures::asio::simple_not_mtsafe; params.infrastructure_factory(asio_env::factory(io_svc)); }); } |
Запускаем, делаем парочку запросов вида:
$ curl -i http://localhost:8080/
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 100
Server: RESTinio_0_2-SO_5_5_19-asio_1_11_0
Date: :Tue, 02 May 2017 08:44:00 GMT
Content-Type: text/html; charset=utf-8
<html><head><title>Example</title></head><body>Request processing was delayed by 25ms.</body></html>
$ curl -i http://localhost:8080/data
HTTP/1.1 501 Not Implemented
Connection: close
Content-Length: 0
И видим в логе работы что-то вроде:
$ ./target/vc15_0_19_10_25017_x64/release/sample.restinio_plus_so5extra.exe
[2017-05-02 11:43:43.441] TRACE: starting server on 127.0.0.1:8080
[2017-05-02 11:43:43.444] INFO: server started on 127.0.0.1:8080
[2017-05-02 11:44:00.650] TRACE: accept connection from: 127.0.0.1:60354
[2017-05-02 11:44:00.653] TRACE: [connection:1] start connection with 127.0.0.1:60354
[2017-05-02 11:44:00.654] TRACE: [connection:1] start waiting for request
[2017-05-02 11:44:00.656] TRACE: [connection:1] continue reading request
[2017-05-02 11:44:00.657] TRACE: [connection:1] received 78 bytes
[2017-05-02 11:44:00.658] TRACE: [connection:1] request received (#0): GET /
[2017-05-02 11:44:00.689] TRACE: [connection:1] append response (#0), flags: { final_parts, connection_keepalive }, bufs count: 2
[2017-05-02 11:44:00.695] TRACE: [connection:1] sending resp data, buf count: 2
[2017-05-02 11:44:00.697] TRACE: [connection:1] outgoing data was sent
[2017-05-02 11:44:00.707] TRACE: [connection:1] should keep alive
[2017-05-02 11:44:00.712] TRACE: [connection:1] start waiting for request
[2017-05-02 11:44:00.721] TRACE: [connection:1] continue reading request
[2017-05-02 11:44:00.725] TRACE: [connection:1] EOF and no request, close connection
[2017-05-02 11:44:00.731] TRACE: [connection:1] close
[2017-05-02 11:44:00.737] TRACE: [connection:1] destroyed
[2017-05-02 11:44:05.234] TRACE: accept connection from: 127.0.0.1:60356
[2017-05-02 11:44:05.236] TRACE: [connection:2] start connection with 127.0.0.1:60356
[2017-05-02 11:44:05.238] TRACE: [connection:2] start waiting for request
[2017-05-02 11:44:05.240] TRACE: [connection:2] continue reading request
[2017-05-02 11:44:05.243] TRACE: [connection:2] received 82 bytes
[2017-05-02 11:44:05.246] TRACE: [connection:2] request received (#0): GET /data
[2017-05-02 11:44:05.249] TRACE: [connection:2] append response (#0), flags: { final_parts, connection_close }, bufs count: 1
[2017-05-02 11:44:05.251] TRACE: [connection:2] sending resp data with connection-close attribute buf count: 1
[2017-05-02 11:44:05.257] TRACE: [connection:2] outgoing data was sent
[2017-05-02 11:44:05.259] TRACE: [connection:2] close
[2017-05-02 11:44:05.263] TRACE: [connection:2] destroyed
Вот, в общем-то и все. Пост, правда, получился из категории: вот что мы уже умеем, но исходники никому не отдадим :) Но нам нужно еще несколько дней, чтобы довести restinio-0.2 и so_5_extra до презентабельного состояния. Так что как только разберемся с текущими вопросами, сразу же опубликуем (что-то на bitbucket-е, что-то на sourceforge).
Теперь полный код примера. Довольно объемный, но это в большей степени из-за многословности C++.
#include <so_5_extra/env_infrastructures/asio/simple_not_mtsafe.hpp> #include <so_5/all.hpp> #include <restinio/all.hpp> // // request_handler // class request_handler final : public so_5::agent_t { public : struct accept_request final : public so_5::message_t { restinio::request_handle_t req_; accept_request(restinio::request_handle_t req) : req_(std::move(req)) {} }; struct process_request final : public so_5::message_t { restinio::request_handle_t req_; std::chrono::milliseconds pause_; process_request( restinio::request_handle_t req, std::chrono::milliseconds pause) : req_(std::move(req)) , pause_(pause) {} }; request_handler(context_t ctx) : so_5::agent_t(std::move(ctx)) { so_subscribe_self() .event(&request_handler::on_accept_request) .event(&request_handler::on_process_request); } private : std::chrono::milliseconds current_pause_{ 25 }; static constexpr std::chrono::milliseconds delta{ 17 }; void on_accept_request(mutable_mhood_t<accept_request> cmd) { so_5::send_delayed<so_5::mutable_msg<process_request>>(*this, current_pause_, std::move(cmd->req_), current_pause_); current_pause_ = (current_pause_ + delta) % 3791; } void on_process_request(mutable_mhood_t<process_request> cmd) { cmd->req_->create_response() .append_header( "Server", "RESTinio_0_2-SO_5_5_19-asio_1_11_0" ) .append_header_date_field() .append_header( "Content-Type", "text/html; charset=utf-8" ) .set_body( fmt::format( "<html>" "<head><title>Example</title></head>" "<body>" "Request processing was delayed by {}ms." "</body>" "</html>", cmd->pause_.count())) .done(); } }; // // Traits for http_server. // using http_server_traits = restinio::single_thread_traits_t< restinio::asio_timer_factory_t, restinio::single_threaded_ostream_logger_t >; // // http_server // class http_server final : public so_5::agent_t { restinio::http_server_t<http_server_traits> server_; static auto make_server_settings(so_5::mbox_t handler_mbox) { restinio::server_settings_t<http_server_traits> settings; settings.address("127.0.0.1"); settings.request_handler( [handler_mbox](restinio::request_handle_t req) { if("/" == req->header().request_target()) { so_5::send<so_5::mutable_msg<request_handler::accept_request>>( handler_mbox, std::move(req)); return restinio::request_accepted(); } else return restinio::request_rejected(); }); return settings; } public: http_server( context_t ctx, asio::io_service & io_svc, so_5::mbox_t handler_mbox) : so_5::agent_t(std::move(ctx)) , server_( restinio::use_existing_io_service(io_svc), make_server_settings(std::move(handler_mbox))) {} virtual void so_evt_start() override { server_.open(); } virtual void so_evt_finish() override { server_.close(); } }; void run() { asio::io_service io_svc; so_5::launch( [&](so_5::environment_t & env) { env.introduce_coop([&](so_5::coop_t & coop) { auto handler = coop.make_agent<request_handler>(); coop.make_agent<http_server>( std::ref(io_svc), handler->so_direct_mbox()); }); }, [&](so_5::environment_params_t & params) { namespace asio_env = so_5::extra::env_infrastructures::asio::simple_not_mtsafe; params.infrastructure_factory(asio_env::factory(io_svc)); }); } int main() { try { run(); return 0; } catch(const std::exception & x) { std::cerr << "Exception: " << x.what() << std::endl; } return 2; } |
Комментариев нет:
Отправить комментарий