пятница, 5 июня 2015 г.

[prog.c++11] Реактивное программирование поверх SObjectizer-5?

Крамольная идея о декларативном описании конвейеров для обработки сообщений (т.е. о передаче сообщения от одной стадии к другой), была подкинута в обсуждении на LOR-е. Первая пробная реализация была сделана почти сразу. Но потом захотелось большего :)

И вот что буквально только что заработало в качестве прототипа:

virtual void so_evt_start() override
{
   const auto self = so_direct_mbox();

   auto first_pipeline = make_pipeline( *this,
         src | first_stage( "=" ) | second_stage( "-" ) |
         broadcast(
            src | third_stage( self ),
            src | length_calculator() | length_printer() ),
         autoname );

   auto second_pipeline = make_pipeline( *this,
         src | first_stage( "#" ) | second_stage( "*" ) | third_stage( self ),
         autoname,
         // Every stage will work on its own thread.
         disp::active_obj::create_private_disp( so_environment() )->binder() );

   send< first >( first_pipeline, " one " );
   send< first >( second_pipeline, " two " );
}

Здесь создается два конвейера, с несколькими стадиями обработки внутри. Каждая стадия представляется в виде небольшой лямбды. При этом во время компиляции на уровне типов проверяется, чтобы то, что выдает в качестве результата предыдущая стадия, принималось на вход следующей стадией. Т.е. если первая стадия принимает сообщение типа first, а генерирует сообщение типа second, то следующая стадия должна принимать second на вход.

Для каждой стадии создается собственный агент, работа которого заключается в получении очередного сообщения, вызова заданной лямбды и передачи возвращенного из лямбды сообщения дальше. Если лямбда ничего не вернула, то ничего дальше и не отсылается. Что позволяет создавать, например, фильтры.

Каждый конвейер представляется в виде отдельной кооперации. Соответственно, можно назначить кооперации свой диспетчер. Это можно увидеть при создании second_pipeline, там идет привязка к диспетчеру с активными объектами. Так что каждая стадия второго конвейера будет представлена своей собственной рабочей нитью.

Но самое интересное происходит при создании first_pipeline :)

Там после второй стадии идет разделение обработки на два потока. Т.е. инструкция broadcasts() запускает обработку результата second_stage по двум параллельным, независимым веткам. Хотя, за счет C++ных variadic templates, можно делать и больше параллельных веток.

Конечно, это всего лишь прототип и тут есть еще над чем подумать. Например, как сделать так, чтобы при описании конвейера не нужно было начинать со специального маркера src. И как упростить описание стадий, чтобы не нужно было писать вот в таком духе:

stage_handler_t< third, fourth >
length_calculator()
{
   return []( const third & msg ) {
      return make_message< fourth >( msg.m_data.length() );
   };
}

Но еще интереснее придумать, как сделать так, чтобы по конвейеру можно было передавать сигнал EOF.

Это может потребоваться вот в каком случае: допустим, есть конвейер, одной из стадий которого является буфферизатор. Т.е. стадия, которая накапливает несколько мелких сообщений, а дальше отдает одно большое. Когда поток заканчивается, нужно флушировать содержимое буфферизатора. Вопрос в том, как это сделать.

Еще одна интересная задачка -- вставка одной стадии сразу в несколько конвейеров (т.е. создание точки пересечения/объединения конвейеров). И передача в стадию нескольких разных входящих сообщений (например, основное прикладное сообщение и сигнал от таймера). Да, в общем-то, можно и еще чего-нибудь нафантазировать :)

Отправить комментарий