вторник, 2 мая 2017 г.

[prog.c++] Пример асинхронного однопоточного http-сервера на базе so-5.5, restinio и asio

Подготовка к релизу SO-5.5.19 вышла на финишную прямую. Есть надежда, что за неделю завершим подготовку документации и сопроводительных материалов, и после майских праздников выкатим официальный релиз. Пока же попробую на примитивном примере показать две основные фичи версии 5.5.19: способность SObjectizer-а работать только на одной рабочей нити и распространение мутабельных сообщений. Более того, в этом примере будет использоваться такой вариант SObjectizer Environment, который использует asio-шный event-loop в качестве бэк-энда. Подробнее обо всем этом ниже в посте.

Пример представляет из себя примитивнейший http-сервер, который при выполнении запроса к URL '/' генерирует простенькую html-страничку. Но генерирует с задержкой, которая постепенно увеличивается. При этом, не смотря на то, что обработка отдельного http-запроса "тормозит", http-сервер спокойно занимается приемом и обработкой параллельных запросов.

В качестве http-сервера используется наш restinio, который мы как раз и создавали для того, чтобы иметь возможность легко и непринужденно делать асинхронную обработку http-запросов. Так, благодаря архитектуре restinio мы имеем возможность делегировать обработку запроса SObjectizer-овскому агенту (хотя сам restinio к SObjectizer вообще не привязан и может использоваться отдельно). В примере задействована находящаяся в активной разработке версия restinio-0.2. В публичный ее репозиторий актуальные изменения мы пока еще не влили, вероятно сделаем это ближе к дате релиза SO-5.5.19.

Основная фишка данного примера в том, что в нем все работает на одной общей нити: и asio, и restinio, и SObjectizer. Т.е. SObjectizer не создает никакой дополнительной инфраструктуры, для диспетчеризации событий агентов и таймеров используется asio и евоный event-loop.

Вводных слов достаточно, давайте рассмотрим код примера. Сначала в виде небольших кусочков с пояснениями, а затем уже весь листинг целиком.

Итак, сначала подключаем необходимые заголовочные файлы:

#include <so_5_extra/env_infrastructures/asio/simple_not_mtsafe.hpp>

#include <so_5/all.hpp>

#include <restinio/all.hpp>

С подключением all.hpp из so_5 и restinio все должно быть понятно -- это публичные API SObjectizer-а и restinio соответственно. А вот подключение simple_not_mtsafe.hpp из какого-то so_5_extra нужно рассмотреть отдельно.

Во-первых, для того, чтобы SObjectizer смог работать как в многопоточном, так и в однопоточном режиме, в SObjectizer было введено специальное понятие environment infrastructure (или env_infrastructure). Т.е. это инфраструктура, которая требуется для того, чтобы SObjectizer мог выполнять свою работу. В состав SO-5.5.19 включено несколько штатных реализаций env_infrastructure: обычная многопоточная, однопоточная с обеспечением thread-safety и однопоточная без обеспечения этой самой thread-safety. Однако, поскольку пользователям SObjectizer этого может оказаться мало и захочется что-то с перламутровыми пуг специфическое, то нам показалось неразумно тянуть в ядро SObjectizer все подряд. Вместо этого мы начали создание дополнительного проекта so_5_extra, в который, как мы надеемся, будут помещаться дополнительные инструменты и расширения для SObjectizer.

На данный момент в so_5_extra есть всего одна штука -- это как раз реализация env_infrastructure на базе asio-шного event-loop-а. Именно эта реализация и подключается в первом #include.

Примечание. Пока что so_5_extra разрабатывается в приватном репозитории и мы откроем его код когда решим текущие вопросы с выбором подходящего типа лицензии для кода so_5_extra.

Каждый принятый http-запрос будет делегироваться для обработки отдельному SObjectizer-овскому агенту под названием request_handler. Логика работы у него простая: сперва он получает новый запрос, но не обрабатывает сразу, а откладывает обработку на некоторое время. Когда это время истекает, агент request_handler генерирует ответную html-страничку. В коде это выглядит вот так (не показаны только определения сообщений дабы не отвлекать внимания от основной логики):

class request_handler final : public so_5::agent_t {
public :
   request_handler(context_t ctx) : so_5::agent_t(std::move(ctx)) {
      // В конструктора сразу же подписываем события агента.
      // Он получает только два сообщения из своего собственного mbox-а.
      so_subscribe_self()
         .event(&request_handler::on_accept_request)
         .event(&request_handler::on_process_request);
   }

private :
   // Текущий размер паузы перед генерацией ответа на запрос.
   std::chrono::milliseconds current_pause_{ 25 };
   // На столько будет увеличиваться пауза при получении очередного запроса.
   static constexpr std::chrono::milliseconds delta{ 17 };

   // Реакция на поступление нового запроса.
   // Запрос получается как мутабельное сообщение для того, чтобы
   // переданное в сообщении содержимое можно было изъять и передать
   // через move в новое сообщение.
   void on_accept_request(mutable_mhood_t<accept_request> cmd) {
      // Имитатация задержки выполняется через простое отложенное
      // сообщение, которое агент отсылает сам себе.
      // В это отложенное сообщение мы перемещаем оригинальный запрос.
      so_5::send_delayed<so_5::mutable_msg<process_request>>(*this,
            current_pause_,
            std::move(cmd->req_),
            current_pause_);

      // Пауза должна быть увеличена, при этом она не должна расти бесконечно.
      current_pause_ = (current_pause_ + delta) % 3791;
   }

   // Пришло время все-таки обработать запрос.
   // Поскольку отложенное сообщение отсылалось как мутабельное,
   // то и обрабатываться оно должно как мутабельное.
   void on_process_request(mutable_mhood_t<process_request> cmd) {
      // Просто генерируем маленькую страничку в ответ с установкой
      // нескольких http-заголовков в ответе.
      cmd->req_->create_response()
         .append_header( "Server""RESTinio_0_2-SO_5_5_19-asio_1_11_0" )
         .append_header_date_field()
         .append_header( "Content-Type""text/html; charset=utf-8" )
         .set_body(
            fmt::format(
               "<html>"
               "<head><title>Example</title></head>"
               "<body>"
               "Request processing was delayed by {}ms."
               "</body>"
               "</html>",
               cmd->pause_.count()))
         .done(); // Все, при вызове done ответ пойдет ожидающему клиенту.
   }
};

Но агент request_handler -- это уже обработчик запросов, которые кто-то должен принять и передать агенту request_handler-у. Давайте посмотрим, как это происходит.

Нам потребуется http_server_t из restinio, но restinio::http_server_t -- это шаблонный класс, которому в качестве параметра шаблона нужно указать некий traits. Этот traits будет определять основные параметры работы http_server-а. Определим этот traits так:

using http_server_traits = restinio::single_thread_traits_t<
      restinio::asio_timer_factory_t,
      restinio::single_threaded_ostream_logger_t >;

Т.е. мы говорим, что нам потребуется однопоточный вариант http-сервера (restinio::single_threaded_traits_t), который для отслеживания тайм-аутов будет использовать механизм таймеров asio (restinio::asio_timer_factory_t). Кроме того, мы же хотим видеть какие-то следы обработки запросов нашим сервером. Для этого мы говорим, что http-сервер должен использовать примитивный логгер без защиты от многопоточности (restinio::single_threaded_ostream_logger_t). Этот логгер будет выплевывать сообщения прямо на стандартный поток вывода.

После того, как мы определили http_server_traits нам нужен экземпляр restinio::http_server_t<http_server_traits> у которого кто-то должен вызывать метод open() для старта работы сервера и метод close() для остановки сервера. Логичным видится запихивание этого дела в отдельного агента http_server, который выглядит приблизительно вот так:

class http_server final : public so_5::agent_t {
   restinio::http_server_t<http_server_traits> server_;

public:
   http_server(
      context_t ctx,
      asio::io_service & io_svc,
      so_5::mbox_t handler_mbox)
      :  so_5::agent_t(std::move(ctx))
      ,  server_(
            restinio::use_existing_io_service(io_svc),
            make_server_settings(std::move(handler_mbox)))
   {}

   virtual void so_evt_start() override {
      server_.open();
   }

   virtual void so_evt_finish() override {
      server_.close();
   }
};

Т.е. у агента http_server есть атрибут server_ типа restinio::http_server_t<http_server_traits>. Когда агент http_server начинает работать внутри SObjectizer-а, он вызывает sever_.open(). Когда завершает -- вызывает server_.close(). В принципе, все просто.

Но внутри http_server есть один вспомогательный метод, make_server_settings, который формирует набор конфигурационных параметров для http-сервера. Вот код этого метода:

static auto make_server_settings(so_5::mbox_t handler_mbox) {
   restinio::server_settings_t<http_server_traits> settings;
   // На этом адресе сервер будет слушать входящие подключения.
   settings.address("127.0.0.1");
   // Назначаем обработчик запросов. 
   settings.request_handler(
      // Данная лямбда будет вызываться каждый раз, как http-сервер распарсит
      // очередной запрос от очередного клиента.
      [handler_mbox](restinio::request_handle_t req) {
         // Воспринимаем только запросы на '/'.
         if("/" == req->header().request_target()) {
            // Делегируем запрос агенту request_handler.
            so_5::send<so_5::mutable_msg<request_handler::accept_request>>(
                  handler_mbox, std::move(req));
            // Этим мы говорим http-серверу, что запрос принят и нужно
            // ждать, пока будет сгенерирован ответ на запрос.
            return restinio::request_accepted();
         }
         else
            // Все остальные запросы мы отказываемся принимать, поэтому
            // http-сервер сам отдаст клиенту ответ "not implemented".
            return restinio::request_rejected();
      });
   return settings;
}

Ну вот, собственно, почти и все. Осталось только запустить SObjectizer с двумя агентами внутри. Один из них стартует http-сервер и начнет принимать запросы, а второй будет эти запросы обрабатывать:

void run() {
   // Asio на котором все будет работать.
   asio::io_service io_svc;
   // Запускаем SO-5.
   so_5::launch(
      [&](so_5::environment_t & env) {
         // Начальные действия для SObjectizer заключаются
         // в создании единственной кооперации с двумя агентами внутри.
         env.introduce_coop([&](so_5::coop_t & coop) {
            auto handler = coop.make_agent<request_handler>();
            coop.make_agent<http_server>( std::ref(io_svc), handler->so_direct_mbox());
         });
      },
      [&](so_5::environment_params_t & params) {
         // SO-5 нужно должным образом настроить. Он должен использовать
         // специализированный env_infrastructure, благодаря которому все
         // события будут обрабатываться через asio-шный event-loop.
         namespace asio_env = so_5::extra::env_infrastructures::asio::simple_not_mtsafe;
         params.infrastructure_factory(asio_env::factory(io_svc));
      });
}

Запускаем, делаем парочку запросов вида:

$ curl -i http://localhost:8080/
HTTP/1.1 200 OK
Connection: keep-alive
Content-Length: 100
Server: RESTinio_0_2-SO_5_5_19-asio_1_11_0
Date: :Tue, 02 May 2017 08:44:00 GMT
Content-Type: text/html; charset=utf-8

<html><head><title>Example</title></head><body>Request processing was delayed by 25ms.</body></html>

$ curl -i http://localhost:8080/data
HTTP/1.1 501 Not Implemented
Connection: close
Content-Length: 0

И видим в логе работы что-то вроде:

$ ./target/vc15_0_19_10_25017_x64/release/sample.restinio_plus_so5extra.exe
[2017-05-02 11:43:43.441] TRACE: starting server on 127.0.0.1:8080
[2017-05-02 11:43:43.444]  INFO: server started  on 127.0.0.1:8080
[2017-05-02 11:44:00.650] TRACE: accept connection from: 127.0.0.1:60354
[2017-05-02 11:44:00.653] TRACE: [connection:1] start connection with 127.0.0.1:60354
[2017-05-02 11:44:00.654] TRACE: [connection:1] start waiting for request
[2017-05-02 11:44:00.656] TRACE: [connection:1] continue reading request
[2017-05-02 11:44:00.657] TRACE: [connection:1] received 78 bytes
[2017-05-02 11:44:00.658] TRACE: [connection:1] request received (#0): GET /
[2017-05-02 11:44:00.689] TRACE: [connection:1] append response (#0), flags: { final_parts, connection_keepalive }, bufs count: 2
[2017-05-02 11:44:00.695] TRACE: [connection:1] sending resp data, buf count: 2
[2017-05-02 11:44:00.697] TRACE: [connection:1] outgoing data was sent
[2017-05-02 11:44:00.707] TRACE: [connection:1] should keep alive
[2017-05-02 11:44:00.712] TRACE: [connection:1] start waiting for request
[2017-05-02 11:44:00.721] TRACE: [connection:1] continue reading request
[2017-05-02 11:44:00.725] TRACE: [connection:1] EOF and no request, close connection
[2017-05-02 11:44:00.731] TRACE: [connection:1] close
[2017-05-02 11:44:00.737] TRACE: [connection:1] destroyed
[2017-05-02 11:44:05.234] TRACE: accept connection from: 127.0.0.1:60356
[2017-05-02 11:44:05.236] TRACE: [connection:2] start connection with 127.0.0.1:60356
[2017-05-02 11:44:05.238] TRACE: [connection:2] start waiting for request
[2017-05-02 11:44:05.240] TRACE: [connection:2] continue reading request
[2017-05-02 11:44:05.243] TRACE: [connection:2] received 82 bytes
[2017-05-02 11:44:05.246] TRACE: [connection:2] request received (#0): GET /data
[2017-05-02 11:44:05.249] TRACE: [connection:2] append response (#0), flags: { final_parts, connection_close }, bufs count: 1
[2017-05-02 11:44:05.251] TRACE: [connection:2] sending resp data with connection-close attribute buf count: 1
[2017-05-02 11:44:05.257] TRACE: [connection:2] outgoing data was sent
[2017-05-02 11:44:05.259] TRACE: [connection:2] close
[2017-05-02 11:44:05.263] TRACE: [connection:2] destroyed

Вот, в общем-то и все. Пост, правда, получился из категории: вот что мы уже умеем, но исходники никому не отдадим :) Но нам нужно еще несколько дней, чтобы довести restinio-0.2 и so_5_extra до презентабельного состояния. Так что как только разберемся с текущими вопросами, сразу же опубликуем (что-то на bitbucket-е, что-то на sourceforge).

Теперь полный код примера. Довольно объемный, но это в большей степени из-за многословности C++.

#include <so_5_extra/env_infrastructures/asio/simple_not_mtsafe.hpp>

#include <so_5/all.hpp>

#include <restinio/all.hpp>

//
// request_handler
//
class request_handler final : public so_5::agent_t {
public :
   struct accept_request final : public so_5::message_t {
      restinio::request_handle_t req_;

      accept_request(restinio::request_handle_t req) : req_(std::move(req)) {}
   };

   struct process_request final : public so_5::message_t {
      restinio::request_handle_t req_;
      std::chrono::milliseconds pause_;

      process_request(
         restinio::request_handle_t req,
         std::chrono::milliseconds pause)
         : req_(std::move(req))
         , pause_(pause)
      {}
   };

   request_handler(context_t ctx) : so_5::agent_t(std::move(ctx)) {
      so_subscribe_self()
         .event(&request_handler::on_accept_request)
         .event(&request_handler::on_process_request);
   }

private :
   std::chrono::milliseconds current_pause_{ 25 };
   static constexpr std::chrono::milliseconds delta{ 17 };

   void on_accept_request(mutable_mhood_t<accept_request> cmd) {
      so_5::send_delayed<so_5::mutable_msg<process_request>>(*this,
            current_pause_,
            std::move(cmd->req_),
            current_pause_);

      current_pause_ = (current_pause_ + delta) % 3791;
   }

   void on_process_request(mutable_mhood_t<process_request> cmd) {
      cmd->req_->create_response()
         .append_header( "Server""RESTinio_0_2-SO_5_5_19-asio_1_11_0" )
         .append_header_date_field()
         .append_header( "Content-Type""text/html; charset=utf-8" )
         .set_body(
            fmt::format(
               "<html>"
               "<head><title>Example</title></head>"
               "<body>"
               "Request processing was delayed by {}ms."
               "</body>"
               "</html>",
               cmd->pause_.count()))
         .done();
   }
};

//
// Traits for http_server.
//
using http_server_traits = restinio::single_thread_traits_t<
      restinio::asio_timer_factory_t,
      restinio::single_threaded_ostream_logger_t >;

//
// http_server
//
class http_server final : public so_5::agent_t {
   restinio::http_server_t<http_server_traits> server_;

   static auto make_server_settings(so_5::mbox_t handler_mbox) {
      restinio::server_settings_t<http_server_traits> settings;
      settings.address("127.0.0.1");
      settings.request_handler(
         [handler_mbox](restinio::request_handle_t req) {
            if("/" == req->header().request_target()) {
               so_5::send<so_5::mutable_msg<request_handler::accept_request>>(
                     handler_mbox, std::move(req));
               return restinio::request_accepted();
            }
            else
               return restinio::request_rejected();
         });
      return settings;
   }

public:
   http_server(
      context_t ctx,
      asio::io_service & io_svc,
      so_5::mbox_t handler_mbox)
      :  so_5::agent_t(std::move(ctx))
      ,  server_(
            restinio::use_existing_io_service(io_svc),
            make_server_settings(std::move(handler_mbox)))
   {}

   virtual void so_evt_start() override {
      server_.open();
   }

   virtual void so_evt_finish() override {
      server_.close();
   }
};

void run() {
   asio::io_service io_svc;
   so_5::launch(
      [&](so_5::environment_t & env) {
         env.introduce_coop([&](so_5::coop_t & coop) {
            auto handler = coop.make_agent<request_handler>();
            coop.make_agent<http_server>( std::ref(io_svc), handler->so_direct_mbox());
         });
      },
      [&](so_5::environment_params_t & params) {
         namespace asio_env = so_5::extra::env_infrastructures::asio::simple_not_mtsafe;
         params.infrastructure_factory(asio_env::factory(io_svc));
      });
}

int main() {
   try {
      run();
      return 0;
   }
   catch(const std::exception & x) {
      std::cerr << "Exception: " << x.what() << std::endl;
   }

   return 2;
}
Отправить комментарий