пятница, 15 апреля 2016 г.

[prog.c++11] Пример использования C++ных шаблонов для сериализации/десериализации MQ-шных сообщений

Язык C++ вполне обоснованно считается сложным. Но, как мне представляется, возможности, которые оказываются в руках разработчика, все-таки перевешивают недостатки языка. В качестве примера того, как не самые простые вещи из C++ помогают в разработке софта, приведу небольшой пример из того, что довелось недавно делать.

Смысл в том, что нужно было реализовать общение нескольких приложений через MQTT-шный брокер. MQTT предоставляет транспорт и позволяет передавать произвольные сообщения. При этом на формат прикладных сообщений сам MQTT никаких ограничений не накладывает. Хочешь передавать XML -- пожалуйста, хочешь JSON -- нет вопросов, хочешь бинарный Protobuf -- никаких проблем.

Однако, на уровне прикладного кода хотелось бы оперировать уже десериализованными сообщениями, переложив заботы о сериализации/десериализации прикладных сообщений на какой-то промежуточный слой. Еще одно пожелание, которое хотелось бы учесть, состояло в том, что внутри одного приложения-клиента могло быть несколько независимых друг от друга подсистем, каждая из которых оперировало своим подмножеством топиков.

В качестве MQTT-клиента пока используется libmosquitto. Это C-шная библиотека, которая, как мне показалось, рассчитана на ну очень уж простые сценарии использования. И очевидно было, что для наших нужд вокруг libmosquitto нужно будет делать какую-то С++ную обертку, закрывающую ряд вопросов к libmosquitto.

Соответственно, раз такая обертка появляется, то сразу же появляется желание доверить ей и вопросы сериализации/десериализации прикладных сообщений. Грубо говоря, получаем очередное сообщение от MQTT в callback-функции, тут же парсим его и получаем десериализованное прикладное сообщение. Которое уже и отсылается подписчикам (коих может быть несколько).

С одной стороны, это очень заманчиво. С другой -- есть несколько неприятных моментов.

Во-первых, очень не хотелось "прибивать гвоздями" конкретный формат прикладных сообщений к libmosquitto-овской обертке. Т.к. вполне может возникнуть необходимость применять эту же обертку в других приложениях, в которых вместо JSON-а может использоваться XML или Protobuf. Кроме того, чисто теоретически, такая возможность может возникнуть даже внутри одного приложения. Напомню, что в рамках одного процесса может быт несколько подсистем со своими наборами топиков, поэтому каждая подсистема вполне может использовать свой формат данных. (Ситуацию с независимыми подсистемами можно было бы разрулить выдавая своей подсистеме собственное подключение к брокеру, но тогда у брокера количество клиентских подключений возрастало бы многократно.)

Во-вторых, операции десериализации вполне могут оказаться довольно тяжелыми. И делать их на единственной I/O-нити, на которой libmosquitto дергает on_message callbacks, не кажется хорошей идеей.

В-третьих, не понятно, как на I/O-нити libmosquitto реагировать на ошибки сериализации/десериализации. Т.е. попытались десериализовать, наткнулись на проблему. Скажем, в прикладом сообщении отсутствует какое-то обязательное поле. Или оно есть, но в неверном формате. И что делать? Если бы такая ошибка была диагностирована в прикладом обработчике, там можно было бы что-то предпринять кроме логирования самого факта проблемы. Например, если это было сообщение с запросом какого-то действия, то можно было бы ответить другим сообщением с описанием проблемы. Но в on_message callback-е такого не сделаешь.

Посему, было решено вынести операции сериализации/десериализации в прикладной код. Но освободить разработчика от рутинных операций по сериализации/десериализации. Для этого мы сделали вот что...

Ввели такое понятие, как TAG. Это тип, единственная задача которого определять формат представления прикладных сообщений. Поэтому TAG представляется пустыми структурами вида:

struct json_encoding {};

struct xml_encoding {};

struct binary_tlv_encoding {};

Далее были введены два шаблонных класса encoder_t<TAG,MSG> и decoder_t<TAG,MSG>, в каждом из которых определяется всего один статический метод (эти классы понадобились из-за того, что в C++ нельзя делать частичную специализацию функций):

templatetypename TAG, typename SOURCE_TYPE >
struct encoder_t
   {
      static std::string encode( const SOURCE_TYPE & );
   };

templatetypename TAG, typename RESULT_TYPE >
struct decoder_t
   {
      static RESULT_TYPE decode( const std::string & );
   };

После чего от прикладного разработчика требуется сделать либо частную, либо полную специализацию двух этих типов.

Частичная специализации зачастую оказывается достаточно. Например, в каждом типе прикладого сообщения есть методы to_json() и from_json(). Тогда достаточно будет сделать приблизительно такие специализации шаблонов encoder_t и decoder_t:

templatetypename RESULT_TYPE >
struct decoder_t< json_encoding, RESULT_TYPE >
   {
      static RESULT_TYPE decode( const std::string & payload )
      {
         RESULT_TYPE msg;
         msg.from_json( payload );
         return msg;
      }
   };

templatetypename SOURCE_TYPE >
struct encoder_t< json_encoding, SOURCE_TYPE >
   {
      static std::string encode( const SOURCE_TYPE & msg )
      {
         return msg.to_json();
      }
   };

Соответственно, для отсылки сообщений предоставляется шаблонный инструмент, который получает два параметра -- TAG и тип конкретного сообщения. Что-то вроде:

topic_publisher_t< json_encoding >::publish(
  instance, // Transport manager to be used.
  "clients/statuses/updates"// Topic for message.
  status_update_t{...} ); // Message to be published.

Здесь тип TAG для шаблонной функции publish задается явно, а вот тип сообщения выводится автоматически: в данном случае это status_update_t.

Поскольку одна подсистема, как правило, работает с одним и тем же форматом, то чтобы не писать везде TAG, можно воспользоваться using-ом:

using my_publisher = topic_publisher_t< json_encoding >;
...
my_publisher::publish(
  instance, // Transport manager to be used.
  "clients/statuses/updates"// Topic for message.
  status_update_t{...} ); // Message to be published.

По капотом у publish-а очень простая логика. Там просто дергается метод encode у класса encoder_t с соответствующими параметрами. Так, для приведенного выше примера метод encoder будет вызван у класса encoder_t<json_encoding_t, status_update_t>. Поскольку мы определили частичную специализацию encoder_t<json_encoding_t, SOURCE_TYPE>, то будет вызван код именно этой специализации. Что, в конечном итоге, приведет к вызову метода to_json у класса status_update_t.

Правда, реализация publish-а выглядит несколько страшнее, чем описание принципа его работы :) Но это просто следствие C++ного синтаксиса:

templatetypename ENCODER_TAG >
templatetypename MSG >
void
topic_publisher_t< ENCODER_TAG >::publish(
   const instance_t & instance,
   std::string topic_name,
   const MSG & msg )
   {
      so_5::send< publish_message_t >(
            instance.mbox(),
            std::move(topic_name),
            encoder_t< ENCODER_TAG, MSG >::encode( msg ) );
   }

С приемом входящих сообщений и их десериализацией ситуация несколько сложнее. В частности из-за того, что в топике могут ходить сообщения нескольких типов и при десериализации может потребоваться сперва понять, что же это за тип такой и как его десериализовать.

Поэтому десериализация сделана несколько хитрее. Сперва подписчику доставляется сообщение типа incoming_message_t<TAG>, в котором находится неразобранный payload (т.е. именно те данные, которые были получены от MQ). Для преобразования этого payload-а в конкретное сообщение нужно вызвать шаблонный метод decode(). В простом случае, когда в каждом топике ходят только сообщения одного типа, это может выглядеть вот так:

void on_status_update(const incoming_message_t< json_encoding > & cmd)
  {
    // cmd contains topic name and payload.
    // Actual message object should be extracted manually.
    const auto upd = cmd.decode< status_update_t >();
    ... // Some processing.
  }

При этом метод incoming_message_t<MSG>::decode() имеет чрезвычайно простую реализацию:

templatetypename DECODER_TAG >
class incoming_message_t : public so_5::message_t
   {
      ...
      const std::string m_payload;
   public :
      ...
      const std::string &
      payload() const { return m_payload; }

      templatetypename MSG >
      MSG decode() const
         {
            return decoder_t< DECODER_TAG, MSG >::decode( this->payload() );
         }
   };

Можно так же специлизировать классы encoder_t и decoder_t полностью (т.е. и по типу формата, и по типу сообщения). Причем, если приложению нужно только отсылать данные, то можно написать специализацию только класса encoder_t. Например, если мы хотим опубликовать текущие показатели какого-то датчика температуры в градусах в виде простой текстовой строки, то мы можем сделать это вот таким образом:

struct text_encoding {}; // Will be used as TAG for encoder.

struct temperature_sensor_data_t
   {
      std::uint8_t m_raw_data; // Raw data from sensor.
   };

// Encoding for temperature value.
template<>
class encoder_t< text_encoding, temperature_sensor_data_t >
   {
      static std::string encoder( const temperature_sensor_data_t & msg )
         {
            return fmt::format( "{} C", msg.m_raw_data / 10.0f );
         }
   }

// Publishing.
using temperature_publisher = topic_publisher_t< text_encoding >;
...
temperature_publisher::publish(
   instance, // Transport manager for publishing.
   fmt::format( "sensors/{}/last", sensor_id ), // Topic name.
   temperature_sensor_data_t{ sensor_value } ); // Message to be published.

Не знаю, может все это выглядит мудрено. Но, имхо, ничего сложного в этом нет. А вот такие фокусы -- это как раз то, из-за чего я до сих пор продолжаю использовать C++. Т.к. солидная выразительная мощь с одной стороны, и отличная эффективность с другой. Плюс широкий спектр платформ на которых все это может работать. Чего-либо подобного в мейнстриме не много. Если вообще есть ;)

Комментариев нет: