По мотивам недавнего сообщения на счет реактивного программирования поверх SObjectizer-5. Хочется в качестве иллюстрации предложить какой-то интересный и более-менее жизненный пример. И как раз с этим есть сложности...
Первый пример, который на данный момент выглядит наиболее привлекательно, такой: выполняется опрос некоторого датчика температуры, после чего выполняется несколько стадий обработки полученного значения. Как то: валидация "сырого" значения датчика (т.е. исключение эффектов "дребезга" аппаратуры), конвертация "сырого" значения в актуальное, архивирование актуального значения, проверка актуального значения на выходы за допустимый диапазон, проверка наличия нескольких подряд выходов за допустимый диапазон, реакция на такие выходы...
В графическом виде это можно представить как-то так:
RAW_val => validation => conversion ==> archiving
`=> distribution
`=> range_checking => alarm_detection ==> alarm_initiation
`=> alarm_distribution
Т.е. на вход конвейеру приходит сообщение raw_value, содержащее, допустим, два 8-ми битовых значения из регистров датчика. Это сообщение проверяется на корректность. Допустим, значение первого регистра не должно превышать 0x7 (т.е. ненулевыми могут быть только три младших бита). Если стадия validation не обнаруживает проблем, то дальше идет valid_raw_value. Если обнаруживает, то raw_value дальше не идет, обработка завершается на стадии validation (т.е. проблемное значение тупо выбрасывается).
После стадии validation сообщение valid_raw_value попадает на стадию conversion, где значение 8-ми битовых регистров преобразуются в значение float. И далее уже идет сообщение sensor_value, которое содержит показатель температуры воздуха в градусах Цельсия.
Обработка sensor_value разделяется на три независимые ветки:
- значение просто архивируется в какое-то хранилище (например, посредством сервиса statsd);
- значение передается куда-то наружу (например, посредством DDS или MQTT);
- значение проверяется на попадание в разрешенный диапазон. И, если попадание не произошло, проверяется, не следует ли обрабатывать данную ситуацию как аварийную.
Если ветки архивации и распространения sensor_value просты и содержат всего по одной операции, то третья ветка, с проверкой на попадание в разрешенный диапазон, интереснее.
Стадия range_checking выпускает наружу сообщение suspicion_value, если значение во входном sensor_value не укладывается в заданные рамки. Если же sensor_value содержит нормальное значение. то range_checking ничего не возвращает (сообщение "проглатывается" данной стадией).
Стадия alarm_detection получает только suspiction_value и здесь контролируется, приходят ли два подозрительных значения в течении короткого времени. Например, если за 10 секунд поступает три сообщения о подозрительном значении, значит речь идет о настоящем выходе за разрешенные приделы, а не о случайном всплеске, который мог быть спровоцирован разными факторами (включая и погрешность измерений).
Если сдатия alarm_detection действительно обнаруживает устойчивую проблему, то она выдает сообщение alarm_detected. И обработка этого сообщения, в свою очередь, разделяется на две ветки: инициирование обработки тревоги (например, отключение нагревательных приборов, включение вентиляции и т.д.) и распространение сигнала о найденном отклонении (например, посредством DDS или MQTT).
Что смущает в этом примере. В первую очередь то, что для обработки таких мелких значений, как показания датчика температуры, нет особой надобности в организации конвейера из нескольких агентов (а за каждой стадией обработки будет прятаться агент). Наверняка, намного эффективнее будет строить такую обработку внутри одно-двух агентов, без ненужной пересылки сообщений между стадиями. Например, validation/conversion/range_checking/alarm_detection вполне могут выполняться сразу на одном и том же рабочем контексте. И только стадии, требующие взаимодействия с внешним миром (archiving, distribution, alarm_initiation, alarm_distribution), могут быть представлены отдельными агентами, работающими на своем собственном контексте.
Поэтому есть еще мысль о другом примере, в котором использование разных контекстов будет выглядеть более оправданным.
Пусть нам нужно подготовить и отослать зашифрованное и подписанное сообщение. Пусть сообщение достаточно большое, поэтому шифровать его все посредством асимметричного алгоритма слишком накладно. Поэтому мы генерируем для сообщения уникальный сессионный ключ, шифруем симметричным алгоритмом все сообщение этим сессионным ключом, а сам ключ шифруем открытым ключом получателя сообщения. После чего все вместе (т.е. шифрованное сообщение + шифрованный сессионный ключ) подписываем своим закрытым ключом и отправляем получателю. Последовательность получается приблизительно такая:
MSG => sess_key_gen => sess_key_encrypt => msg_encrypt => msg_sign => delivering
Однако, здесь явно напрашивается раздвоение обработки, с последующим объединением нескольких веток:
MSG => sess_key_gen ==> sess_key_encrypt ==> concat_ket_body => msg_sign => delivering
`=> msg_encrypt =/
А вот такого слияния-то как раз сейчас и нет. Раздвоение есть, но оно разрешено только на конечных стадиях конвейера (как с примером про счетчик температуры).
Посему интересно, а нет ли у читателей примеров организации конвейеров из дорогостоящих (в плане расхода CPU) операций. Например, с обработкой изображений (скажем dеnoise => sharpening => timestamping => watermarking) или аудио?
Возвращаясь к примеру со счетчиком температуры.
Есть еще такая мысль, что полезно было бы иметь возможность разделить поток обработки в зависимости от значения. Например, показанный выше конвейер можно было бы определить так:
RAW_val => validation ==> illegal_value_processor => meter_restarter
`=> conversion ==> archiving
`=> distribution
`=> range_checking => alarm_detection ==> alarm_initiation
`=> alarm_distribution
Стадия validation может вернуть либо valid_raw_value, если значение счетчика признано корректным, либо illegal_raw_value, если значение явно "левое". Корректное значение далее обрабатывается обычным образом -- конвертируется и т.д. А вот некорректное поступает на вход к специальной стадии, которая похожа на alarm_detection. Например, если за 10 секунд пришло три некорректных значения, то нужно дать команду на программный ресет датчика.
Получается, что если дать возможность вставлять в конвейер стадию, работающую по логике if (или switch), то можно делать и другие типы конвейеров. Например, обработка PDU в каком-нибудь бинарном протоколе:
BYTES => pdu_parser ==> PDU_1_processor => ...
`=> PDU_2_processor => ...
`=> PDU_3_processor => ...
...
Такой фичи пока тоже нет. Как и соображений о том, как ее реализовать. Ведь сейчас стадии конвейера объединяются за счет типов возвращаемых значений: на вход стадии (i+1) можно подать то, что вернула стадия (i). Следовательно, стадия с логикой if должна уметь возвращать два значения с разными типами. А стадия с логикой switch -- и еще больше. Но ведь паттерн-матчинга в C++ нет :)
Ну а теперь, собственно говоря, вопрос к читателям: вам бы хватило только первого примера со счетчиком температуры?
Если нет, то почему? И какой бы пример конвейеров вам был бы интересен?
Update. Похоже, есть простой способ реализации switcher-ов без использования аналогов паттер-матчинга. Достаточно отказаться от способа возврата обработки входного сообщения в виде возвращаемого значения функции. Стадии конвейера можно сделать в виде функций, которые получают, как минимум, два аргумента: ссылку на входящее сообщение и sink-объект для приема результата. Т.е. вместо чего-то такого:
several_results< valid_raw_value, illegal_raw_value > validator( const raw_value & in ) { if( 0x7 <= in.m_data.m_high_bits ) return result< valid_raw_value >( in.m_data ); else return result< illegal_raw_value >( in.m_data.m_meter_id ); } |
Можно было бы писать так:
void validator( const raw_value & in, sink< valid_raw_value > valid_case, sink< illegal_raw_value > invalid_case ) { if( 0x7 <= in.m_data.m_high_bits ) valid_case.push( in.m_data ); else invalid_case.push( in.m_data.m_meter_id ); } |
И конвейеры можно организовывать вот так:
make_pipeline( *this, src | switcher( validator, // stage handler. src | stage( convesion ) | ..., // valid_case pipeline. src | stage( illegal_value_processor ) | stage( reseter ) // invalid_case pipeline. ) ); make_pipeline( *this, src | switcher( pdu_parser, src | PDU_1_processor | ..., src | PDU_2_processor | ..., src | PDU_3_processor | ..., ... ) ); |
Update 2. Похоже, этот способ не такой уж простой :)
Комментариев нет:
Отправить комментарий