Эксперименты с реактивными программированием и выстраиванием обработчиков сообщений в конвейеры начались из-за обсуждения релиза SO-5.5.5 на LOR-е. То, что получилось на данный момент -- это чистой воды черновой proof-of-concept, который развивается как один из примеров в дистрибутиве SO-5. Каких-либо планов по включению pipelines в состав штатных средств SObjectizer пока нет. Более того...
Сейчас вовсе не очевидно, нужно ли раскручивать эту тему дальше. Прямо сейчас идет выбор между тем, чтобы:
- Просто написать комментарии в коде примера, чтобы затем можно было разобраться что к чему, зафиксировать версию SO-5.5.6 и приступить к работе над SO-5.6.0;
- Попробовать в рамках этого примера расширить функциональность pipelines настолько, насколько это возможно. Затем таки написать комментарии, зафиксировать версию SO-5.5.6 и приступить к работе над SO-5.6.0 :)
Проблема первого варианта в том, что "нет ничего более постоянного, чем временное". Т.е. если упустить возможность сделать хорошо сейчас, потом возможности вернуться и переделать "по-уму", уже не будет.
Проблема второго варианта в том, что если расширять уже имеющуюся функциональность pipelines, скажем, такими вещами, как switcher и joiner, то совершенно не понятно, во что это выльется по трудозатратам. Т.е. может осенить сразу и красивое рабочее решение найдется буквально сегодня-завтра. А можно убить неделю-две и ничего путного не придумать.
Поэтому я пока просто покажу то, что есть уже сейчас. Ну а потом посмотрю, есть ли к этому какой-нибудь интерес. Если интереса не будет, то можно смело идти по первому варианту (т.е. консервировать работы в этом направлении).
Итак, в качестве примера с конвейером взят пример с обработкой данных от датчика температуры из вчерашнего поста:
RAW_val => validation => conversion ==> archiving
`=> distribution
`=> range_checking => alarm_detection ==> alarm_initiation
`=> alarm_distribution
Т.е. "сырые" данные, представляющие из себя два 8-ми битовых значения из регистров датчика, поступают на стадию валидации, дабы устранить эффект "дребезга" аппаратуры. Если данные признаны валидными, то они идут на стадию конвертации, т.е. преобразования двух 8-ми битовых полей в одно поле типа float с температурой в градусах Цельсия.
Сконвертированные данные поступают сразу на три параллельных, независимых друг от друга ветки. Первые две просто архивируют (например, в БД) и распространяют данные во внешний мир (например, посредством DDS). А вот третья ветка является еще одним конвейером из нескольких стадий.
Первая стадия третьей ветки -- это проверка на выход температуры за переделы разрешенного диапазона. Затем, если такой выход состоялся, следует вторая стадия -- проверка того, что это не единичный случайный выход, а реальный признак опасности. И если такая опасность обнаружена, то далее опять два параллельных, независимых действия: архивирование сигнала тревоги (например, в БД) и его распространение во внешний мир (например, посредством DDS).
В коде организация и использование такого конвейера записывается буквально вот так:
virtual void so_evt_start() override { // Создание конвейера. // Возвращается mbox самой первой стадии созданного конвейера. // Отсылая сообщения на этот mbox инициируется цепочка событий // по обработке отосланного сообщения. auto pipeline = make_pipeline( *this, src | stage(validation) | stage(conversion) | broadcast( src | stage(archivation), src | stage(distribution), src | stage(range_checking) | stage(alarm_detector{}) | broadcast( src | stage(alarm_initiator), src | stage( []( const alarm_detected & v ) { alarm_distribution( cerr, v ); } ) ) ), autoname ); send_delayed_to_agent< shutdown >( *this, chrono::seconds(1) ); // Использование конвейера. // Посредством отложенных сообщений имитируется поступление // данных от датчика температуры. for( uint8_t i = 0; i < static_cast< uint8_t >(250); i += 10 ) send_delayed< raw_value >( so_environment(), pipeline, chrono::milliseconds( i ), raw_measure{ 0, 0, i } ); } |
Здесь каждая цепочка начинается со специального макера src. В принципе, посредством перегрузки operator| можно было бы от этого вообще отказаться. Но пока src оставлен по двум основным причинам:
- так намного проще отслеживать, где именно начинается каждая из цепочек;
- со временем в src можно передавать какие-то параметры для конвейера (например, поведение конвейера в случае перегрузки, дефолтный диспетчер для стадий обработки и т.д.)
Стадии конвейера создаются либо вспомогательной функцией stage(), либо функцией broadcast(). Инструкция broadcast говорит о том, что результат обработки данной стадии будет размножен по всем заданным внутри broadcast-а параллельным веткам.
В качестве аргументов для stage() могут выступать обычные свободные функции (большинство стадий в этом примере являются таковыми), stateful-объекты с перегруженным operator() (здесь таковым является стадия alarm_detector), а так же лямбды (на самом деле это ни что иное, как те же самые stateful-объекты, но генерируемые автоматически компилятором). В этом примере есть все из вышеперечисленного. Впрочем, наверное проще привести полный текст всех обработчиков стадий конвейера:
msg_ptr_t< valid_raw_value > validation( const raw_value & v ) { if( 0x7 >= v.m_data.m_high_bits ) return make_message< valid_raw_value >( v.m_data ); else return make_empty< valid_raw_value >(); } msg_ptr_t< sensor_value > conversion( const valid_raw_value & v ) { return make_message< sensor_value >( calculated_measure{ v.m_data.m_meter_id, 0.5f * ((static_cast< uint16_t >( v.m_data.m_high_bits ) << 8) + v.m_data.m_low_bits) } ); } void archivation( const sensor_value & v ) { clog << "archiving (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } void distribution( const sensor_value & v ) { clog << "distributing (" << v.m_data.m_meter_id << "," << v.m_data.m_measure << ")" << endl; } msg_ptr_t< suspicion_value > range_checking( const sensor_value & v ) { if( v.m_data.m_measure >= 45.0f ) return make_message< suspicion_value >( v.m_data ); else return make_empty< suspicion_value >(); } class alarm_detector { using clock = chrono::steady_clock; public : msg_ptr_t< alarm_detected > operator()( const suspicion_value & v ) { if( m_has_value ) if( m_previous + chrono::milliseconds(25) > clock::now() ) { m_has_value = false; return make_message< alarm_detected >( v.m_data.m_meter_id ); } m_previous = clock::now(); m_has_value = true; return make_empty< alarm_detected >(); } private : clock::time_point m_previous; bool m_has_value = false; }; void alarm_initiator( const alarm_detected & v ) { clog << "=== alarm (" << v.m_meter_id << ") ===" << endl; } void alarm_distribution( ostream & to, const alarm_detected & v ) { to << "alarm_distribution (" << v.m_meter_id << ")" << endl; } |
Собственно, каждая стадия, за исключением alarm_detector, -- это простейшая функция. Alarm_detector чуть сложнее, т.к. он должен сохранять свое состояние между вызовами. Да и alarm_detector мог бы быть чуть проще, но не хотелось ради optional подключать Boost к проекту (а в MSVC++ компиляторах experimental/optional может и не быть).
Понятное дело, что если бы нужно было организовывать конвейеры в реальных приложениях, то кода в каждой стадии обработки было бы больше. Но, очевидно, что если бы каждую из вышеперечисленных стадий пришлось описывать в виде отдельного класса агента, с опеределением so_define_agent(), с провязыванием агентов в цепочки, то кода было бы еще больше. Так что в этом плане эксперимент оказался не только интересным, но и удачным.
Еще большим плюсом, нежели сокращение объема кода, является типобезопасность. Типы входного аргумента и результирующего значения каждой стадии конвейера выводятся шаблонной магией в compile-time. И в compile-time же происходит контроль за совместимостью двух соседних стадий по типам. Т.е. стадию conversion не получится соединить со стадией alarm_detector, поскольку выход conversion не совместим со входом для alarm_detector. И вот как раз ради такого строгого контроля со стороны компилятора весь этот эксперимент и затевался.
Текущая версия примера make_pipeline может быть найдена в репозитории. Сразу хочу предупредить, что код там черновой, непричесанный, без комментариев. Да и возни с C++ными шаблонами там порядком. А т.к. с шаблонной магией я на "Вы", то и отличного качества шаблонных наворотов ожидать не стоит.
Вот такая ситуация на сегодня. Как она изменится в последующие дни -- не знаю. С одной стороны, хочется реализовать такой тип обработчиков, как switcher. Но и застревать над ним на неделю-две нет желания. Так что буду думать.
Комментариев нет:
Отправить комментарий