Давеча опубликовал на Хабре большую статью на английском языке "A declarative data-processing pipeline on top of actors? Why not?", в которой описал старый, но доведенный "до ума" пример make_pipeline из состава SObjectizer-а. К самой статье Григорий Демченко задал интересные вопросы в комментарии. Плюс на Reddit-е подсунули ссылку на (как мне показалось) недоделанную и полузаброшенную библиотеку RaftLib.
В общем, появилось навязчивое желание сделать продолжение этой темы.
По сути, тема создания пайплайнов (или графов в общем случае) обработки данных на C++ состоит из двух частей. Можно сказать, что есть верхняя и нижняя части проблемы.
Нижняя часть -- это то, как работа распределяется по рабочим контекстам. Т.е. это вопросы, связанные с тем, что из себя на самом деле представляют стадии пайплайна, как стадии привязываются к тем или иным рабочим нитям, как происходит передача информации между стадиями пайплайна. В принципе, нижнюю часть не обязательно делать с нуля самому. Можно использовать какой-то готовый инструмент. Скажем, Intel TBB. Или, как в моем случае, SObjectizer. С нижним уровнем связано несколько вопросов и мне хочется посмотреть, какие ответы на эти вопросы может (и может ли) предоставить SObjectizer.
Верхняя часть -- это тот DSL, который будет предоставляться пользователю для конструирования своих пайплайнов. И здесь есть ряд интересных и не исследованных для меня вопросов. Начиная от того, какую функциональность пользователь сможет получить в свои руки. И заканчивая тем, как выразить эту функциональность в C++ коде, дабы получить контроль за какими-то ошибками прямо в compile-time.
Пока что есть желание потратить несколько дней на эксперименты в этой области. Первые соображения о том, какие цели преследуются и какие мысли уже есть, изложены под катом. Кому интересно, милости прошу подключаться к обсуждению.
Цели и задачи
Итак, главная задача, про которую вряд ли стоит лишний раз говорить -- это привлечение внимания к SObjectizer-у. К моему сожалению, время показало, что далеко не всегда люди, узнавая про наш инструмент, понимали как и для чего они могут его применить. Этим SObjectizer разительно (и не в лучшую сторону) отличается от RESTinio и json_dto. Поэтому статьи, вроде упомянутой выше, призваны дать потенциальным пользователям дополнительную пищу для размышлений. И забросить дополнительный якорь в память читателям ;)
Помимо главной задачи есть желание попробовать сделать инструмент, который был бы лишен ряда серьезных недостатков примитивного пайплайна из примера make_pipeline. А именно:
- нет возможности вернуть более одного значения. В примере make_pipeline каждая стадия могла вернуть либо одно значение всего одного типа, либо вообще не вернуть ни одного. Поэтому нельзя было делать стадии, которые на одно входящее сообщение могли бы сгенерировать несколько исходящих сообщений. В том числе и несколько исходящих сообщений одного типа;
- как следствие предыдущего недостатка в примере make_pipeline нельзя было сделать "ветвления". Например, стадия validation могла бы возвращать либо valid_raw_value (и тогда обработка шла бы по одной ветке), либо invalid_raw_value (и тогда бы обработка шла бы по другой ветке);
- как еще одно следствие этого же недостатка нельзя было сделать так, чтобы стадия могла получать на вход сообщение не одного типа T, а сообщение из некоторого набора типов. Скажем, alarm_detector мог бы получать suspicious_value или time_window_closed;
- не было никакого механизма защиты стадий от перегрузки, поскольку стадии представлялись обычными агентами, сообщения доставлялись обычным образом, а у агентов очереди в принципе неограниченного размера;
- не было возможности добавить какое-то подобие "проактивности" для стадий. Например, сделать так, чтобы стадия, получив входящее сообщение, взвела таймер, а потом среагировала бы на истечение таймера и смогла бы сгенерировать одно или несколько исходящих сообщений.
Т.е. хочется попробовать сделать такой набор высокоуровневых инструментов, который бы позволил описывать сложные конвейеры обработки данных, где стадии могут получать входящие сообщения из нескольких источников, могут в процессе обработки отсылать разные исходящие сообщения (и в разных количествах), могут заказывать таймеры и реагировать на них, будут защищены от перегрузки (например, слишком активно отсылающая исходящие сообщения стадия должна приостанавливаться на send-е).
Первые наметки
Для того, чтобы иметь возможность работать с несколькими входами и выходами на стадиях пайплайнов, придется использовать объекты в качестве стадий. Т.е. если в простой версии make_pipeline можно было использовать обычную функцию, то теперь это уже вряд ли прокатит. По крайней мере будет гораздо проще отталкиваться от того, что стадия -- это объект.
Далее, поскольку стадии могут иметь несколько входящих сообщений и несколько исходящих, то изменится способ получения и отсылки сообщений. Вместо аргументов и возвращаемого значения функции/метода, как это было в простом make_pipeline, предполагается использовать отдельные объекты-слоты (по аналогии с портами в RaftLib). Что дает возможность описывать стадию конвейера, например, таким образом:
struct validator_t : public daglib::stage_t { daglib::input_t<raw_value> in_; daglib::output_t<valid_raw_value> out_valid_; daglib::output_t<invalid_raw_value> out_invalid_; validator_t(daglib::context_t ctx) {...} }; |
Здесь поле типа input_t означает слот для входящего сообщения. Соответственно, поле типа output_t означает слот для исходящего сообщения. И пользователь в своем классе для стадии может объявить столько слотов, сколько ему потребуется.
На каждый входной слот нужно навесить свой обработчик. Предположительно, происходить это будет в конструкторе объекта-стадии посредством метода context_t::bind. Этот метод получает ссылку на объект-стадию, ссылку на входной слот и лямбду-обработчик входящего сообщения:
struct validator_t : public daglib::stage_t { daglib::input_t<raw_value> in_; daglib::output_t<valid_raw_value> out_valid_; daglib::output_t<invalid_raw_value> out_invalid_; validator_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) { if(0x7 >= in.m_data.m_high_bits) out_valid_.push(v.m_data); else out_invalid_.push(v.m_data.m_meter_id); }); } }; |
Соответственно, если у стадии несколько входных слотов, то навесить обработчик нужно на каждого из них.
При определении самого пайплайна нужно сперва создать объекты-стадии и определить специальный объект-источник, из которого в пайплайн и будут поступать сообщения:
daglib::default_context_t ctx; auto validator = ctx.stage<validator_t>(); auto converter = ctx.stage<conversion_t>(); auto archiver = ctx.stage<archiver_t>(); auto distributor = ctx.stage<distributor_t>(); auto range_checker = ctx.stage<range_checker_t>(); auto alarm_detector = ctx.stage<alarm_detector_t>(); auto alarm_archiver = ctx.stage<alarm_archiver_t>(); daglib::source_t<raw_value> src; |
После чего стадии пайплайна нужно будет связать друг с другом. Можно делать это совсем по простому, "в лоб":
src >>= validator->in_; validator->out_valid_ >>= converter->in_; converter->out_ >>= archiver->in_; converter->out_ >>= distributor->in_; converter->out_ >>= range_checker->in_; range_checker->out_ >>= alarm_detector->in_; alarm_detector->out_ >>= alarm_archiver->in_; |
Т.е. мы говорим, что входящие сообщения из источника будут идти на входной слот стадии validator. Затем из исходящего слота out_valid стадии validator сообщение пойдет на входной слот стадии converter. И т.д.
Но можно (а может и нужно) упороться и сделать что-то вот такое:
src >> validator->in_[validator->out_valid_] >> converter->in_[converter->out_] >> (archiver->in_ & distributor->in_ & (range_checker->in_[range_checker->out_] >> alarm_detector->in_[alarm_detector->out_] >> alarm_archiver->in_) ); |
Здесь конструкция stage->in[stage->out] означает, что сообщение идет на вход stage->in, а результат обработки этого сообщения затем пойдет на stage->out. Имхо, запись вида:
src >> one->in[one->out] >> two->in[two->out] >> three->in;
Позволит в понятной декларативной форме описывать простые линейные пайплайны. Хотя мне самому в записи stage->in[stage->out] не нравится дублирование stage. Но пока ничего лучше не придумалось.
Поддержка таймеров
Сделать поддержку таймеров можно посредством объявления обычного входного слота, который будет связан не выходным слотом предыдущей стадии, а со специальным источником. Что-то вроде:
struct frequence_checker_t : public daglib::stage_t { // Входной слот для сообщений от предыдущей стадии. daglib::input_t<ordinary_message> in_; // Входной слот для сообщений от таймера. daglib::input_t<timer_message> in_timer_; ... // Источник сообщений от таймера. daglib::timer_source_t<timer_message> timer_; frequence_checker_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) { ... // Здесь нужно сообщение от таймера. timer_.schedule(150ms); ... }); ctx.bind(*this, in_timer_, [this](const auto & in) { ... }); // Сразу связываем источник и входящий слот для таймера. timer_ >> in_timer_; } }; |
Входные слоты типа one_of и all_of
В простых случаях входной слот параметризуется всего одним типом. Например, input_t<T> означает, что ожидается сообщение типа T и обработчику сообщения будет передана константная ссылка на объект типа T.
Но могут быть и более сложные ситуации. Например, стадия может ждать одно из нескольких возможных сообщений. Скажем стадия problem_monitor может реагировать на сообщения invalid_raw_value и на alarm_detected. Можно, конечно, создать два простых входных слота -- один для invalid_raw_value, второй для alarm_detected. Но это может вести к дублированию кода и лапшеобразному коду. Поэтому в таком случае пригодился бы слот one_of:
struct problem_monitor_t : public daglib::stage_t { daglib::input_one_of_t<invalid_raw_message, alarm_detected> in_; ... problem_monitor_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const std::variant<invalid_raw_message, alarm_detected> & in) {...}); } }; |
Аналогично, может быть необходимость в слоте типа all_of, т.е. обработчик вызывается только когда все заявленные сообщения в него поступают:
struct completion_handler_t : public daglib::stage_t { daglib::input_all_of_t<branch_A_result, branch_B_result> in_; ... completion_handler_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const std::tuple<branch_A_result, branch_B_result> & in) { ... }); } }; |
Только тут уже обработчик получает не std::variant, в std::tuple.
Привязывание стадий к разным контекстам
В общем-то, планируется сделать по аналогии с тем, как это было реализовано в простом make_pipeline: за каждым объектом-стадией скрывается SObjectizer-овский агент. Что позволяет указать для стадии dispatcher binder и этот binder привяжет соответствующего агента к нужному рабочему контексту.
Как именно dispatcher binder будет передаваться в объект-стадию и в какой именно момент будут создаваться агенты для каждой стадии -- это открытый вопрос. Надеюсь, что ответы на него найдутся в процессе экспериментирования.
Защита стадий от перегрузки
Дабы защитить стадии от перегрузки (т.е. когда предыдущая стадия конвейера генерирует больше исходящих сообщений, чем следующая стадия способна обработать) я планирую задействовать SObjectizer-овские mchain-ы фиксированной длины для входных слотов. Т.е. за каждым слотом будет скрываться mchain. Поэтому попытка записать сообщение в уже полный входной слот следующей стадии будет приостанавливать отправителя сообщения. Тогда как сообщения агенту, стоящему за объектом-стадией, будут отсылаться только когда в каком-то слоте появляются данные для обработки. Поэтому сообщений агенту будет отсылаться меньше, чем будет находится в mchain-е (при интенсивном трафике).
Тут есть подводные камни. В частности, mchain предлагает такие реакции на попытку записать сообщение в полный канал, как drop_oldest и drop_newest. Вероятно, для пайплайнов эти опции будут недоступны. В общем, тут так же есть над чем подумать.
Как же это все может выглядеть?
Например, вот так. За основу взят пример, описанный в статье "A declarative data-processing pipeline on top of actors? Why not?", но не полностью, только основные моменты:
struct validator_t : public daglib::stage_t { daglib::input_t<raw_value> in_; daglib::output_t<valid_raw_value> out_valid_; daglib::output_t<invalid_raw_value> out_invalid_; validator_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) { if(0x7 >= in.m_data.m_high_bits) out_valid_.push(v.m_data); else out_invalid_.push(v.m_data.m_meter_id); }); } }; struct converter_t : public daglib::stage_t { daglib::input_t<valid_raw_value> in_; daglib::output_t<sensor_value> out_; converter_t(daglib_t::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) { out_.push( calculated_measure{ v.m_data.m_meter_id, 0.5f * ((static_cast< uint16_t >(v.m_data.m_high_bits) << 8) + v.m_data.m_low_bits) }); }); }; struct archiver_t : public daglib::stage_t { daglib::input_t<sensor_value> in_; daglib::output_t<archiving_failure> out_failure_; archiver_t(daglib_t::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) { std::cout << "archiving: " << in << std::endl; }); } }; struct distributor_t : public daglib::stage_t { daglib::input_t<sensor_value> in_; distributor_t(daglib_t::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) { std::cout << "distributing: " << in << std::endl; }); } }; struct range_checker_t : public daglib::stage_t { daglib::input_t<sensor_value> in_; daglib::output_t<suspicious_value> out_; range_checker_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) { if(in.m_data.m_measure >= 45.0f) out_.push(v.m_data); }); } }; struct alarm_detector_t : public daglib::stage_t { daglib::input_t<suspicious_value> in_; daglib::output_t<alarm_detected> out_; std::optional<suspicious_value> previous_; alarm_detector_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) {...}); } }; struct alarm_archiver_t : public daglib::stage_t { daglib::input_t<alarm_detected> in_; alarm_archiver_t(daglib::context_t ctx) { ctx.bind(*this, in_, [this](const auto & in) {...}); } }; daglib::default_context_t ctx; auto validator = ctx.stage<validator_t>(); auto converter = ctx.stage<conversion_t>(); auto archiver = ctx.stage<archiver_t>(); auto distributor = ctx.stage<distributor_t>(); auto range_checker = ctx.stage<range_checker_t>(); auto alarm_detector = ctx.stage<alarm_detector_t>(); auto alarm_archiver = ctx.stage<alarm_archiver_t>(); daglib::source_t<raw_value> src; src >> validator->in_[validator->out_valid_] >> converter->in_[converter->out_] >> (archiver->in_ & distributor->in_ & (range_checker->in_[range_checker->out_] >> alarm_detector->in_[alarm_detector->out_] >> alarm_archiver->in_) ); |
Можно ли как-то этот страх и ужас упростить?
В принципе, для простых стадий пайплайна, у которых есть один вход и один выход, можно сделать штатный класс simple_stage_t, который позволит писать так:
struct converter_t : public daglib::simple_stage_t<valid_raw_value, sensor_value> { converter_t(daglib_t::context_t ctx) { ctx.bind(*this, in(), [this](const auto & in) { out().push( calculated_measure{ v.m_data.m_meter_id, 0.5f * ((static_cast< uint16_t >(v.m_data.m_high_bits) << 8) + v.m_data.m_low_bits) }); }); }; |
Заключение
Вот такой эксперимент хочется провести на следующей неделе. Если время и ресурсы позволят, конечно. Если получится, то затем будет подготовлена еще одна статья для Хабра. Вероятно, снова на слабом подобии английского ;)
У кого будет желание поделиться мыслями и соображениями на этот счет -- не сдерживайте себя, плиз. А если кто-то сможет привести пример более-менее жизненного графа обработки данных, но не слишком навороченного, чтобы его можно было достаточно лаконично объяснить читателю в итоговой статье, то вообще было бы замечательно.
Комментариев нет:
Отправить комментарий